From 09f226d16182c8aa98de645f6c60c9542a6a6c69 Mon Sep 17 00:00:00 2001 From: Won Jun Jang Date: Mon, 16 Jul 2018 18:55:32 -0400 Subject: [PATCH 1/7] Move adaptive sampling to OSS Signed-off-by: Won Jun Jang --- .../sampling/strategystore/adaptive/cache.go | 24 + .../adaptive/calculationstrategy/interface.go | 14 + .../percentage_increase_capped_calculator.go | 46 ++ ...centage_increase_capped_calculator_test.go | 26 + .../sampling/strategystore/adaptive/config.go | 207 +++++++ .../strategystore/adaptive/processor.go | 553 ++++++++++++++++++ 6 files changed, 870 insertions(+) create mode 100644 plugin/sampling/strategystore/adaptive/cache.go create mode 100644 plugin/sampling/strategystore/adaptive/calculationstrategy/interface.go create mode 100644 plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator.go create mode 100644 plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator_test.go create mode 100644 plugin/sampling/strategystore/adaptive/config.go create mode 100644 plugin/sampling/strategystore/adaptive/processor.go diff --git a/plugin/sampling/strategystore/adaptive/cache.go b/plugin/sampling/strategystore/adaptive/cache.go new file mode 100644 index 00000000000..61c7ca3102d --- /dev/null +++ b/plugin/sampling/strategystore/adaptive/cache.go @@ -0,0 +1,24 @@ +package adaptive + +// samplingCacheEntry keeps track of the probability and whether a service-operation is using adaptive sampling +type samplingCacheEntry struct { + probability float64 + usingAdapative bool +} + +type samplingCache map[string]map[string]*samplingCacheEntry + +func (s samplingCache) Set(service, operation string, entry *samplingCacheEntry) { + if _, ok := s[service]; !ok { + s[service] = make(map[string]*samplingCacheEntry) + } + s[service][operation] = entry +} + +func (s samplingCache) Get(service, operation string) *samplingCacheEntry { + _, ok := s[service] + if !ok { + return nil + } + return s[service][operation] +} diff --git a/plugin/sampling/strategystore/adaptive/calculationstrategy/interface.go b/plugin/sampling/strategystore/adaptive/calculationstrategy/interface.go new file mode 100644 index 00000000000..052ce79216b --- /dev/null +++ b/plugin/sampling/strategystore/adaptive/calculationstrategy/interface.go @@ -0,0 +1,14 @@ +package calculationstrategy + +// ProbabilityCalculator calculates the new probability given the current and target QPS +type ProbabilityCalculator interface { + Calculate(targetQPS, curQPS, prevProbability float64) (newProbability float64) +} + +// Func wraps a function of appropriate signature and makes a ProbabilityCalculator from it. +type Func func(targetQPS, curQPS, prevProbability float64) (newProbability float64) + +// Calculate implements Calculator interface. +func (f Func) Calculate(targetQPS, curQPS, prevProbability float64) float64 { + return f(targetQPS, curQPS, prevProbability) +} diff --git a/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator.go b/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator.go new file mode 100644 index 00000000000..7cc77eebd4f --- /dev/null +++ b/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator.go @@ -0,0 +1,46 @@ +package calculationstrategy + +const ( + defaultPercentageIncreaseCap = 0.5 +) + +// PercentageIncreaseCappedCalculator is a probability calculator that caps the probability +// increase to a certain percentage of the previous probability. +// +// Given prevProb = 0.1, newProb = 0.5, and cap = 0.5: +// (0.5 - 0.1)/0.1 = 400% increase. Given that our cap is 50%, the probability can only +// increase to 0.15. +// +// Given prevProb = 0.4, newProb = 0.5, and cap = 0.5: +// (0.5 - 0.4)/0.4 = 25% increase. Given that this is below our cap of 50%, the probability +// can increase to 0.5. +type PercentageIncreaseCappedCalculator struct { + percentageIncreaseCap float64 +} + +// NewPercentageIncreaseCappedCalculator returns a new percentage increase capped calculator. +func NewPercentageIncreaseCappedCalculator(percentageIncreaseCap float64) PercentageIncreaseCappedCalculator { + if percentageIncreaseCap == 0 { + percentageIncreaseCap = defaultPercentageIncreaseCap + } + return PercentageIncreaseCappedCalculator{ + percentageIncreaseCap: percentageIncreaseCap, + } +} + +// Calculate calculates the new probability. +func (c PercentageIncreaseCappedCalculator) Calculate(targetQPS, curQPS, prevProbability float64) float64 { + factor := targetQPS / curQPS + newProbability := prevProbability * factor + // If curQPS is lower than the targetQPS, we need to increase the probability slowly to + // defend against oversampling. + // Else if curQPS is higher than the targetQPS, jump directly to the newProbability to + // defend against oversampling. + if factor > 1.0 { + percentIncrease := (newProbability - prevProbability) / prevProbability + if percentIncrease > c.percentageIncreaseCap { + newProbability = prevProbability + (prevProbability * c.percentageIncreaseCap) + } + } + return newProbability +} diff --git a/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator_test.go b/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator_test.go new file mode 100644 index 00000000000..6f4cd242e1f --- /dev/null +++ b/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator_test.go @@ -0,0 +1,26 @@ +package calculationstrategy + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCalculate(t *testing.T) { + calculator := NewPercentageIncreaseCappedCalculator(0) + tests := []struct { + targetQPS float64 + curQPS float64 + oldProbability float64 + expectedProbability float64 + testName string + }{ + {1.0, 2.0, 0.1, 0.05, "test1"}, + {1.0, 0.5, 0.1, 0.15, "test2"}, + {1.0, 0.8, 0.1, 0.125, "test3"}, + } + for _, tt := range tests { + probability := calculator.Calculate(tt.targetQPS, tt.curQPS, tt.oldProbability) + assert.InDelta(t, probability, tt.expectedProbability, 0.0001, tt.testName) + } +} diff --git a/plugin/sampling/strategystore/adaptive/config.go b/plugin/sampling/strategystore/adaptive/config.go new file mode 100644 index 00000000000..50b80e7119f --- /dev/null +++ b/plugin/sampling/strategystore/adaptive/config.go @@ -0,0 +1,207 @@ +package adaptive + +import ( + "time" + + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/atomic" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/distributedlock" + "github.com/jaegertracing/jaeger/storage/samplingstore" +) + +const ( + defaultAggregationInterval = time.Minute + defaultTargetQPS = 1 + defaultEquivalenceThreshold = 0.3 + defaultLookbackQPSCount = 1 + defaultCalculationInterval = time.Minute + defaultLookbackInterval = time.Minute * 10 + defaultDelay = time.Minute * 2 + defaultSamplingProbability = 0.001 + defaultMinSamplingProbability = 0.00001 // once in 100 thousand requests + defaultLowerBoundTracesPerSecond = 1.0 / (1 * float64(time.Minute/time.Second)) // once every 1 minute + defaultLeaderLeaseRefreshInterval = 5 * time.Second + defaultFollowerLeaseRefreshInterval = 60 * time.Second +) + +// ThroughputAggregatorConfig is the configuration for the ThroughputAggregator. +type ThroughputAggregatorConfig struct { + // AggregationInterval determines how often throughput is aggregated and written to storage. + AggregationInterval time.Duration `yaml:"aggregation_interval"` +} + +// ProcessorConfig is the configuration for the SamplingProcessor. +type ProcessorConfig struct { + // TargetQPS is the target sampled qps for all operations. + TargetQPS float64 `yaml:"target_qps"` + + // QPSEquivalenceThreshold is the acceptable amount of deviation for the operation QPS from the `targetQPS`, + // ie. [targetQPS-equivalenceThreshold, targetQPS+equivalenceThreshold] is the acceptable targetQPS range. + // Increase this to reduce the amount of fluctuation in the probability calculation. + QPSEquivalenceThreshold float64 `yaml:"qps_equivalence_threshold"` + + // LookbackQPSCount determines how many previous operation QPS are used in calculating the weighted QPS, + // ie. if LookbackQPSCount is 1, the only the most recent QPS will be used in calculating the weighted QPS. + LookbackQPSCount int `yaml:"lookback_qps_count"` + + // CalculationInterval determines the interval each bucket represents, ie. if an interval is + // 1 minute, the bucket will contain 1 minute of throughput data for all services. + CalculationInterval time.Duration `yaml:"calculation_interval"` + + // LookbackInterval is the total amount of throughput data used to calculate probabilities. + LookbackInterval time.Duration `yaml:"lookback_interval"` + + // Delay is the amount of time to delay probability generation by, ie. if the calculationInterval + // is 1 minute, the number of buckets is 10, and the delay is 2 minutes, then at one time + // we'll have [now()-12,now()-2] range of throughput data in memory to base the calculations + // off of. + Delay time.Duration `yaml:"delay"` + + // DefaultSamplingProbability is the initial sampling probability for all new operations. + DefaultSamplingProbability float64 `yaml:"default_sampling_probability"` + + // MinSamplingProbability is the minimum sampling probability for all operations. ie. the calculated sampling + // probability will be bound [MinSamplingProbability, 1.0] + MinSamplingProbability float64 `yaml:"min_sampling_probability"` + + // LowerBoundTracesPerSecond determines the lower bound number of traces that are sampled per second. + // For example, if the value is 0.01666666666 (one every minute), then the sampling processor will do + // its best to sample at least one trace a minute for an operation. This is useful for a low QPS operation + // that is never sampled by the probabilistic sampler and depends on some time based element. + LowerBoundTracesPerSecond float64 `yaml:"lower_bound_traces_per_second"` + + // LeaderLeaseRefreshInterval is the duration to sleep if this processor is elected leader before + // attempting to renew the lease on the leader lock. NB. This should be less than FollowerLeaseRefreshInterval + // to reduce lock thrashing. + LeaderLeaseRefreshInterval time.Duration `yaml:"leader_lease_refresh_interval"` + + // FollowerLeaseRefreshInterval is the duration to sleep if this processor is a follower + // (ie. failed to gain the leader lock). + FollowerLeaseRefreshInterval time.Duration `yaml:"follower_lease_refresh_interval"` + + // Mutable is a configuration holder that holds configurations that could dynamically change during + // the lifetime of the processor. + Mutable MutableProcessorConfigurator +} + +// MutableProcessorConfigurator is a mutable config holder for certain configs that can change during the lifetime +// of the processor. +type MutableProcessorConfigurator interface { + GetTargetQPS() float64 + GetQPSEquivalenceThreshold() float64 +} + +// ImmutableProcessorConfig is a MutableProcessorConfigurator that doesn't dynamically update (it can be updated, but +// doesn't guarantee thread safety). +type ImmutableProcessorConfig struct { + TargetQPS float64 `json:"target_qps"` + QPSEquivalenceThreshold float64 `json:"qps_equivalence_threshold"` +} + +// GetTargetQPS implements MutableProcessorConfigurator#GetTargetQPS +func (d ImmutableProcessorConfig) GetTargetQPS() float64 { + return d.TargetQPS +} + +// GetQPSEquivalenceThreshold implements MutableProcessorConfigurator#GetQPSEquivalenceThreshold +func (d ImmutableProcessorConfig) GetQPSEquivalenceThreshold() float64 { + return d.QPSEquivalenceThreshold +} + +// MutableProcessorConfig is a MutableProcessorConfigurator that is thread safe and dynamically updates. +type MutableProcessorConfig struct { + targetQPS *atomic.Float64 + qpsEquivalenceThreshold *atomic.Float64 +} + +// NewMutableProcessorConfig returns a MutableProcessorConfigurator that dynamically updates. +func NewMutableProcessorConfig(config ImmutableProcessorConfig) *MutableProcessorConfig { + return &MutableProcessorConfig{ + targetQPS: atomic.NewFloat64(config.GetTargetQPS()), + qpsEquivalenceThreshold: atomic.NewFloat64(config.GetQPSEquivalenceThreshold()), + } +} + +// Update updates the configs. +func (d *MutableProcessorConfig) Update(config ImmutableProcessorConfig) { + d.targetQPS.Store(config.GetTargetQPS()) + d.qpsEquivalenceThreshold.Store(config.GetQPSEquivalenceThreshold()) +} + +// GetTargetQPS implements MutableProcessorConfigurator#GetTargetQPS. +func (d *MutableProcessorConfig) GetTargetQPS() float64 { + return d.targetQPS.Load() +} + +// GetQPSEquivalenceThreshold implements MutableProcessorConfigurator#GetQPSEquivalenceThreshold. +func (d *MutableProcessorConfig) GetQPSEquivalenceThreshold() float64 { + return d.qpsEquivalenceThreshold.Load() +} + +// Builder struct to hold configurations. +type Builder struct { + ThroughputAggregator ThroughputAggregatorConfig `yaml:"throughput_aggregator"` + SamplingProcessor ProcessorConfig `yaml:"sampling_processor"` + + metrics metrics.Factory + logger *zap.Logger +} + +// NewBuilder creates a default builder. +func NewBuilder() *Builder { + return &Builder{ + ThroughputAggregator: ThroughputAggregatorConfig{ + AggregationInterval: defaultAggregationInterval, + }, + SamplingProcessor: ProcessorConfig{ + LookbackQPSCount: defaultLookbackQPSCount, + CalculationInterval: defaultCalculationInterval, + LookbackInterval: defaultLookbackInterval, + Delay: defaultDelay, + DefaultSamplingProbability: defaultSamplingProbability, + MinSamplingProbability: defaultMinSamplingProbability, + LowerBoundTracesPerSecond: defaultLowerBoundTracesPerSecond, + LeaderLeaseRefreshInterval: defaultLeaderLeaseRefreshInterval, + FollowerLeaseRefreshInterval: defaultFollowerLeaseRefreshInterval, + Mutable: ImmutableProcessorConfig{ + TargetQPS: defaultTargetQPS, + QPSEquivalenceThreshold: defaultEquivalenceThreshold, + }, + }, + } +} + +// WithMetricsFactory sets metrics factory. +func (b *Builder) WithMetricsFactory(m metrics.Factory) *Builder { + b.metrics = m + return b +} + +// WithLogger sets logger. +func (b *Builder) WithLogger(l *zap.Logger) *Builder { + b.logger = l + return b +} + +func (b *Builder) applyDefaults() { + if b.metrics == nil { + b.metrics = metrics.NullFactory + } + if b.logger == nil { + b.logger = zap.NewNop() + } +} + +// NewThroughputAggregator creates and returns a ThroughputAggregator. +func (b *Builder) NewThroughputAggregator(storage samplingstore.Store) (Aggregator, error) { + b.applyDefaults() + return NewAggregator(b.metrics, b.ThroughputAggregator.AggregationInterval, storage), nil +} + +// NewProcessor creates and returns a SamplingProcessor. +func (b *Builder) NewProcessor(hostname string, storage samplingstore.Store, lock distributedlock.Lock) (Processor, error) { + b.applyDefaults() + return NewProcessor(b.SamplingProcessor, hostname, storage, lock, b.metrics, b.logger) +} diff --git a/plugin/sampling/strategystore/adaptive/processor.go b/plugin/sampling/strategystore/adaptive/processor.go new file mode 100644 index 00000000000..b2b254f4c36 --- /dev/null +++ b/plugin/sampling/strategystore/adaptive/processor.go @@ -0,0 +1,553 @@ +package adaptive + +import ( + "errors" + "math" + "math/rand" + "sync" + "time" + + "github.com/jaegertracing/jaeger/thrift-gen/sampling" + "github.com/uber-go/atomic" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/distributedlock" + "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/adaptive/calculationstrategy" + "github.com/jaegertracing/jaeger/storage/samplingstore" +) + +const ( + maxSamplingProbability = 1.0 + + samplingLock = "sampling_lock" + + getThroughputErrMsg = "Failed to get throughput from storage" + acquireLockErrMsg = "Failed to acquire lock" + + defaultFollowerProbabilityInterval = 20 * time.Second + + // The number of past entries for samplingCache the leader keeps in memory + serviceCacheSize = 25 +) + +var ( + errIntervals = errors.New("CalculationInterval must be less than LookbackInterval") + errNonZeroIntervals = errors.New("CalculationInterval and LookbackInterval must be greater than 0") + errLockIntervals = errors.New("FollowerLeaseRefreshInterval cannot be less than LeaderLeaseRefreshInterval") + errLookbackQPSCount = errors.New("LookbackQPSCount cannot be less than 1") +) + +type serviceOperationThroughput map[string]map[string]*samplingstore.Throughput + +type throughputBucket struct { + throughput serviceOperationThroughput + interval time.Duration + endTime time.Time +} + +// Processor retrieves service throughput over a look back interval and calculates sampling probabilities +// per operation such that each operation is sampled at a specified target QPS. It achieves this by +// retrieving discrete buckets of operation throughput and doing a weighted average of the throughput +// and generating a probability to match the targetQPS. +type Processor interface { + // GetSamplingStrategyResponses returns the sampling strategy response probabilities of a service. + GetSamplingStrategyResponses(service string) sampling.SamplingStrategyResponse + + // Start initializes and starts the sampling processor which regularly calculates sampling probabilities. + Start() + + // Stop stops the processor from calculating probabilities. + Stop() +} + +type processor struct { + sync.RWMutex + ProcessorConfig + + // flag used to determine if this processor has the leader lock. + atomic.Bool + + storage samplingstore.Store + lock distributedlock.Lock + acquireLockStop chan struct{} + calculationStop chan struct{} + updateProbabilitiesStop chan struct{} + hostname string + + // buckets is the number of `calculationInterval` buckets used to calculate the probabilities. + // It is calculated as lookbackInterval / calculationInterval. + buckets int + + // probabilities contains the latest calculated sampling probabilities for service operations. + probabilities samplingstore.ServiceOperationProbabilities + + // qps contains the latest calculated qps for service operations; the calculation is essentially + // throughput / CalculationInterval. + qps samplingstore.ServiceOperationQPS + + // throughputs slice of `buckets` size that stores the aggregated throughput. The latest throughput + // is stored at the head of the slice. + throughputs []*throughputBucket + + // strategyResponses contains the sampling strategies for every service. + strategyResponses map[string]*sampling.SamplingStrategyResponse + + logger *zap.Logger + + weightsCache *weightsCache + + probabilityCalculator calculationstrategy.ProbabilityCalculator + + // followerProbabilityInterval determines how often the follower processor updates its probabilities. + // Given only the leader writes probabilities, the followers need to fetch the probabilities into + // cache. + followerProbabilityInterval time.Duration + + serviceCache []samplingCache + + operationsCalculatedGauge metrics.Gauge + calculateProbabilitiesLatency metrics.Timer +} + +// NewProcessor creates a new sampling processor that generates sampling rates for service operations +func NewProcessor( + config ProcessorConfig, + hostname string, + storage samplingstore.Store, + lock distributedlock.Lock, + metricsFactory metrics.Factory, + logger *zap.Logger, +) (Processor, error) { + if config.LookbackInterval < config.CalculationInterval { + return nil, errIntervals + } + if config.CalculationInterval == 0 || config.LookbackInterval == 0 { + return nil, errNonZeroIntervals + } + if config.FollowerLeaseRefreshInterval < config.LeaderLeaseRefreshInterval { + return nil, errLockIntervals + } + if config.LookbackQPSCount < 1 { + return nil, errLookbackQPSCount + } + buckets := int(config.LookbackInterval / config.CalculationInterval) + metricsFactory = metricsFactory.Namespace("adaptive_sampling_processor", nil) + return &processor{ + ProcessorConfig: config, + storage: storage, + buckets: buckets, + probabilities: make(samplingstore.ServiceOperationProbabilities), + qps: make(samplingstore.ServiceOperationQPS), + hostname: hostname, + strategyResponses: make(map[string]*sampling.SamplingStrategyResponse), + logger: logger, + lock: lock, + // TODO make weightsCache and probabilityCalculator configurable + weightsCache: newWeightsCache(), + probabilityCalculator: calculationstrategy.NewPercentageIncreaseCappedCalculator(1.0), + followerProbabilityInterval: defaultFollowerProbabilityInterval, + serviceCache: []samplingCache{}, + operationsCalculatedGauge: metricsFactory.Gauge("operations_calculated", nil), + calculateProbabilitiesLatency: metricsFactory.Timer("calculate_probabilities", nil), + }, nil +} + +func (p *processor) GetSamplingStrategyResponses(service string) sampling.SamplingStrategyResponse { + p.RLock() + defer p.RUnlock() + if strategy, ok := p.strategyResponses[service]; ok { + return *strategy + } + return p.generateDefaultSamplingStrategyResponse() +} + +func (p *processor) Start() { + p.logger.Info("Starting sampling processor") + p.acquireLockStop = make(chan struct{}) + p.calculationStop = make(chan struct{}) + p.updateProbabilitiesStop = make(chan struct{}) + p.setLeader(false) + p.loadProbabilities() + p.generateStrategyResponses() + go p.runAcquireLockLoop() + go p.runCalculationLoop() + go p.runUpdateProbabilitiesLoop() +} + +func (p *processor) Stop() { + p.logger.Info("Stopping sampling processor") + close(p.acquireLockStop) + close(p.calculationStop) + close(p.updateProbabilitiesStop) +} + +func (p *processor) loadProbabilities() { + // TODO GetLatestProbabilities API can be changed to return the latest measured qps for initialization + probabilities, err := p.storage.GetLatestProbabilities() + if err != nil { + p.logger.Warn("Failed to initialize probabilities", zap.Error(err)) + return + } + p.Lock() + defer p.Unlock() + p.probabilities = probabilities +} + +// runAcquireLockLoop attempts to acquire the leader lock. If it succeeds, it will attempt to retain it, +// otherwise it sleeps and attempts to gain the lock again. +func (p *processor) runAcquireLockLoop() { + addJitter(p.LeaderLeaseRefreshInterval) + ticker := time.NewTicker(p.acquireLock()) + for { + select { + case <-ticker.C: + ticker.Stop() + ticker = time.NewTicker(p.acquireLock()) + case <-p.acquireLockStop: + ticker.Stop() + return + } + } +} + +// acquireLock attempts to acquire the lock and returns the interval to sleep before the next retry. +func (p *processor) acquireLock() time.Duration { + if acquiredLeaderLock, err := p.lock.Acquire(samplingLock); err == nil { + p.setLeader(acquiredLeaderLock) + } else { + p.logger.Error(acquireLockErrMsg, zap.Error(err)) + } + if p.isLeader() { + // If this host holds the leader lock, retry with a shorter cadence + // to retain the leader lease. + return p.LeaderLeaseRefreshInterval + } + // If this host failed to acquire the leader lock, retry with a longer cadence + return p.FollowerLeaseRefreshInterval +} + +// runUpdateProbabilitiesLoop starts a loop that reads probabilities from storage. +// The follower updates its local cache with the latest probabilities and serves them. +func (p *processor) runUpdateProbabilitiesLoop() { + addJitter(p.followerProbabilityInterval) + ticker := time.NewTicker(p.followerProbabilityInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + // Only load probabilities if this processor doesn't hold the leader lock + if !p.isLeader() { + p.loadProbabilities() + p.generateStrategyResponses() + } + case <-p.updateProbabilitiesStop: + return + } + } +} + +func (p *processor) isLeader() bool { + return p.Load() +} + +func (p *processor) setLeader(isLeader bool) { + p.Store(isLeader) +} + +// addJitter sleeps for a random amount of time. Without jitter, if the host holding the leader +// lock were to die, then all other collectors can potentially wait for a full cycle before +// trying to acquire the lock. With jitter, we can reduce the average amount of time before a +// new leader is elected. Furthermore, jitter can be used to spread out read load on storage. +func addJitter(jitterAmount time.Duration) { + randomTime := (jitterAmount / 2) + time.Duration(rand.Int63n(int64(jitterAmount/2))) + time.Sleep(randomTime) +} + +func (p *processor) runCalculationLoop() { + lastCheckedTime := time.Now().Add(p.Delay * -1) + p.initializeThroughput(lastCheckedTime) + // NB: the first tick will be slightly delayed by the initializeThroughput call. + ticker := time.NewTicker(p.CalculationInterval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + endTime := time.Now().Add(p.Delay * -1) + startTime := lastCheckedTime + throughput, err := p.storage.GetThroughput(startTime, endTime) + if err != nil { + p.logger.Error(getThroughputErrMsg, zap.Error(err)) + break + } + aggregatedThroughput := p.aggregateThroughput(throughput) + p.prependThroughputBucket(&throughputBucket{ + throughput: aggregatedThroughput, + interval: endTime.Sub(startTime), + endTime: endTime, + }) + lastCheckedTime = endTime + // Load the latest throughput so that if this host ever becomes leader, it + // has the throughput ready in memory. However, only run the actual calculations + // if this host becomes leader. + // TODO fill the throughput buffer only when we're leader + if p.isLeader() { + startTime := time.Now() + probabilities, qps := p.calculateProbabilitiesAndQPS() + p.Lock() + p.probabilities = probabilities + p.qps = qps + p.Unlock() + // NB: This has the potential of running into a race condition if the calculationInterval + // is set to an extremely low value. The worst case scenario is that probabilities is calculated + // and swapped more than once before generateStrategyResponses() and saveProbabilities() are called. + // This will result in one or more batches of probabilities not being saved which is completely + // fine. This race condition should not ever occur anyway since the calculation interval will + // be way longer than the time to run the calculations. + p.generateStrategyResponses() + p.calculateProbabilitiesLatency.Record(time.Now().Sub(startTime)) + go p.saveProbabilitiesAndQPS() + } + case <-p.calculationStop: + return + } + } +} + +func (p *processor) saveProbabilitiesAndQPS() { + p.RLock() + defer p.RUnlock() + if err := p.storage.InsertProbabilitiesAndQPS(p.hostname, p.probabilities, p.qps); err != nil { + p.logger.Warn("Could not save probabilities", zap.Error(err)) + } +} + +func (p *processor) prependThroughputBucket(bucket *throughputBucket) { + p.throughputs = append([]*throughputBucket{bucket}, p.throughputs...) + if len(p.throughputs) > p.buckets { + p.throughputs = p.throughputs[0:p.buckets] + } +} + +// aggregateThroughput aggregates operation throughput from different buckets into one. +func (p *processor) aggregateThroughput(throughputs []*samplingstore.Throughput) serviceOperationThroughput { + aggregatedThroughput := make(serviceOperationThroughput) + for _, throughput := range throughputs { + service := throughput.Service + operation := throughput.Operation + if _, ok := aggregatedThroughput[service]; !ok { + aggregatedThroughput[service] = make(map[string]*samplingstore.Throughput) + } + if t, ok := aggregatedThroughput[service][operation]; ok { + t.Count += throughput.Count + t.Probabilities = combineProbabilities(t.Probabilities, throughput.Probabilities) + } else { + aggregatedThroughput[service][operation] = throughput + } + } + return aggregatedThroughput +} + +func (p *processor) initializeThroughput(endTime time.Time) { + for i := 0; i < p.buckets; i++ { + startTime := endTime.Add(p.CalculationInterval * -1) + throughput, err := p.storage.GetThroughput(startTime, endTime) + if err != nil && p.logger != nil { + p.logger.Error(getThroughputErrMsg, zap.Error(err)) + return + } + if len(throughput) == 0 { + return + } + aggregatedThroughput := p.aggregateThroughput(throughput) + p.throughputs = append(p.throughputs, &throughputBucket{ + throughput: aggregatedThroughput, + interval: p.CalculationInterval, + endTime: endTime, + }) + endTime = startTime + } +} + +type serviceOperationQPS map[string]map[string][]float64 + +func (p *processor) generateOperationQPS() serviceOperationQPS { + // TODO previous qps buckets have already been calculated, just need to calculate latest batch and append them + // where necessary and throw out the oldest batch. Edge case #buckets < p.buckets, then we shouldn't throw out + qps := make(serviceOperationQPS) + for _, bucket := range p.throughputs { + for svc, operations := range bucket.throughput { + if _, ok := qps[svc]; !ok { + qps[svc] = make(map[string][]float64) + } + for op, throughput := range operations { + if len(qps[svc][op]) >= p.LookbackQPSCount { + continue + } + qps[svc][op] = append(qps[svc][op], calculateQPS(throughput.Count, bucket.interval)) + } + } + } + return qps +} + +func calculateQPS(count int64, interval time.Duration) float64 { + seconds := float64(interval) / float64(time.Second) + return float64(count) / seconds +} + +// calculateWeightedQPS calculates the weighted qps of the slice allQPS where weights are biased towards more recent +// qps. This function assumes that the most recent qps is at the head of the slice. +func (p *processor) calculateWeightedQPS(allQPS []float64) float64 { + if len(allQPS) == 0 { + return 0 + } + weights := p.weightsCache.getWeights(len(allQPS)) + var qps float64 + for i := 0; i < len(allQPS); i++ { + qps += allQPS[i] * weights[i] + } + return qps +} + +func (p *processor) prependServiceCache() { + p.serviceCache = append([]samplingCache{make(samplingCache)}, p.serviceCache...) + if len(p.serviceCache) > serviceCacheSize { + p.serviceCache = p.serviceCache[0:serviceCacheSize] + } +} + +func (p *processor) calculateProbabilitiesAndQPS() (samplingstore.ServiceOperationProbabilities, samplingstore.ServiceOperationQPS) { + p.prependServiceCache() + retProbabilities := make(samplingstore.ServiceOperationProbabilities) + retQPS := make(samplingstore.ServiceOperationQPS) + svcOpQPS := p.generateOperationQPS() + totalOperations := int64(0) + for svc, opQPS := range svcOpQPS { + if _, ok := retProbabilities[svc]; !ok { + retProbabilities[svc] = make(map[string]float64) + } + if _, ok := retQPS[svc]; !ok { + retQPS[svc] = make(map[string]float64) + } + for op, qps := range opQPS { + totalOperations++ + avgQPS := p.calculateWeightedQPS(qps) + retQPS[svc][op] = avgQPS + retProbabilities[svc][op] = p.calculateProbability(svc, op, avgQPS) + } + } + p.operationsCalculatedGauge.Update(totalOperations) + return retProbabilities, retQPS +} + +func (p *processor) calculateProbability(service, operation string, qps float64) float64 { + oldProbability := p.DefaultSamplingProbability + // TODO: is this loop overly expensive? + p.RLock() + if opProbabilities, ok := p.probabilities[service]; ok { + if probability, ok := opProbabilities[operation]; ok { + oldProbability = probability + } + } + latestThroughput := p.throughputs[0].throughput + p.RUnlock() + + usingAdaptiveSampling := p.usingAdaptiveSampling(oldProbability, service, operation, latestThroughput) + p.serviceCache[0].Set(service, operation, &samplingCacheEntry{ + probability: oldProbability, + usingAdapative: usingAdaptiveSampling, + }) + + // Short circuit if the qps is close enough to targetQPS or if the service isn't using + // adaptive sampling. + targetQPS := p.Mutable.GetTargetQPS() + if math.Abs(qps-targetQPS) < p.Mutable.GetQPSEquivalenceThreshold() || !usingAdaptiveSampling { + return oldProbability + } + var newProbability float64 + if floatEquals(qps, 0) { + // Edge case, we double the sampling probability if the QPS is 0 so that we force the service + // to at least sample one span probabilistically + newProbability = oldProbability * 2.0 + } else { + newProbability = p.probabilityCalculator.Calculate(targetQPS, qps, oldProbability) + } + return math.Min(maxSamplingProbability, math.Max(p.MinSamplingProbability, newProbability)) +} + +func combineProbabilities(p1 map[string]struct{}, p2 map[string]struct{}) map[string]struct{} { + for probability := range p2 { + p1[probability] = struct{}{} + } + return p1 +} + +func (p *processor) usingAdaptiveSampling(probability float64, service, operation string, throughput serviceOperationThroughput) bool { + if floatEquals(probability, p.DefaultSamplingProbability) { + // If the service is seen for the first time, assume it's using adaptive sampling (ie prob == defaultProb). + // Even if this isn't the case, the next time around this loop, the newly calculated probability will not equal + // the defaultProb so the logic will fall through. + return true + } + var opThroughput *samplingstore.Throughput + svcThroughput, ok := throughput[service] + if ok { + opThroughput = svcThroughput[operation] + } + if opThroughput != nil { + f := truncateFloat(probability) + _, ok = opThroughput.Probabilities[f] + return ok + } + // By this point, we know that there's no recorded throughput for this operation for this round + // of calculation. Check the previous bucket to see if this operation was using adaptive sampling + // before. + if len(p.serviceCache) > 1 { + if e := p.serviceCache[1].Get(service, operation); e != nil { + return e.usingAdapative && e.probability != p.DefaultSamplingProbability + } + } + return false +} + +// generateStrategyResponses generates a SamplingStrategyResponse from the calculated sampling probabilities. +func (p *processor) generateStrategyResponses() { + p.RLock() + strategies := make(map[string]*sampling.SamplingStrategyResponse) + for svc, opProbabilities := range p.probabilities { + opStrategies := make([]*sampling.OperationSamplingStrategy, len(opProbabilities)) + var idx int + for op, probability := range opProbabilities { + opStrategies[idx] = &sampling.OperationSamplingStrategy{ + Operation: op, + ProbabilisticSampling: &sampling.ProbabilisticSamplingStrategy{ + SamplingRate: probability, + }, + } + idx++ + } + strategy := p.generateDefaultSamplingStrategyResponse() + strategy.OperationSampling.PerOperationStrategies = opStrategies + strategies[svc] = &strategy + } + p.RUnlock() + + p.Lock() + defer p.Unlock() + p.strategyResponses = strategies +} + +func (p *processor) generateDefaultSamplingStrategyResponse() sampling.SamplingStrategyResponse { + return sampling.SamplingStrategyResponse{ + StrategyType: sampling.SamplingStrategyType_PROBABILISTIC, + OperationSampling: &sampling.PerOperationSamplingStrategies{ + DefaultSamplingProbability: p.DefaultSamplingProbability, + DefaultLowerBoundTracesPerSecond: p.LowerBoundTracesPerSecond, + }, + } +} + +func floatEquals(a, b float64) bool { + return math.Abs(a-b) < 0.0000000001 +} From 342ee1ac552d92761e126300a9547fd05b829910 Mon Sep 17 00:00:00 2001 From: Won Jun Jang Date: Mon, 6 Aug 2018 23:22:04 -0400 Subject: [PATCH 2/7] more Signed-off-by: Won Jun Jang --- pkg/testutils/logger.go | 15 + pkg/testutils/logger_test.go | 21 + .../sampling/strategystore/adaptive/cache.go | 24 - .../adaptive/calculationstrategy/interface.go | 14 + .../percentage_increase_capped_calculator.go | 14 + ...centage_increase_capped_calculator_test.go | 14 + .../sampling/strategystore/adaptive/config.go | 207 ------- .../adaptive/internal/leader_election.go | 117 ++++ .../adaptive/internal/leader_election_test.go | 107 ++++ .../internal/mocks/ElectionParticipant.go | 37 ++ .../strategystore/adaptive/processor.go | 553 ------------------ 11 files changed, 339 insertions(+), 784 deletions(-) delete mode 100644 plugin/sampling/strategystore/adaptive/cache.go delete mode 100644 plugin/sampling/strategystore/adaptive/config.go create mode 100644 plugin/sampling/strategystore/adaptive/internal/leader_election.go create mode 100644 plugin/sampling/strategystore/adaptive/internal/leader_election_test.go create mode 100644 plugin/sampling/strategystore/adaptive/internal/mocks/ElectionParticipant.go delete mode 100644 plugin/sampling/strategystore/adaptive/processor.go diff --git a/pkg/testutils/logger.go b/pkg/testutils/logger.go index 87ad527aa1e..141d36a86a1 100644 --- a/pkg/testutils/logger.go +++ b/pkg/testutils/logger.go @@ -16,6 +16,7 @@ package testutils import ( "encoding/json" + "strings" "sync" "go.uber.org/zap" @@ -88,3 +89,17 @@ func (b *Buffer) Write(p []byte) (int, error) { defer b.Unlock() return b.Buffer.Write(p) } + +// LogMatcher is a helper func that returns true if the subStr appears more than 'occurrences' times in the logs. +var LogMatcher = func(occurrences int, subStr string, logs []string) bool { + if len(logs) < occurrences { + return false + } + var count int + for _, log := range logs { + if strings.Contains(log, subStr) { + count++ + } + } + return count >= occurrences +} diff --git a/pkg/testutils/logger_test.go b/pkg/testutils/logger_test.go index 402bdfe5330..2d37c1e67be 100644 --- a/pkg/testutils/logger_test.go +++ b/pkg/testutils/logger_test.go @@ -15,6 +15,7 @@ package testutils import ( + "fmt" "sync" "testing" @@ -66,3 +67,23 @@ func TestRaceCondition(t *testing.T) { close(start) finish.Wait() } + +func TestLogMatcher(t *testing.T) { + tests := []struct { + occurences int + subStr string + logs []string + expected bool + }{ + {occurences: 1, expected: false}, + {occurences: 1, subStr: "hi", logs: []string{"hi"}, expected: true}, + {occurences: 3, subStr: "hi", logs: []string{"hi", "hi"}, expected: false}, + {occurences: 3, subStr: "hi", logs: []string{"hi", "hi", "hi"}, expected: true}, + } + for i, tt := range tests { + test := tt + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + assert.Equal(t, test.expected, LogMatcher(test.occurences, test.subStr, test.logs)) + }) + } +} diff --git a/plugin/sampling/strategystore/adaptive/cache.go b/plugin/sampling/strategystore/adaptive/cache.go deleted file mode 100644 index 61c7ca3102d..00000000000 --- a/plugin/sampling/strategystore/adaptive/cache.go +++ /dev/null @@ -1,24 +0,0 @@ -package adaptive - -// samplingCacheEntry keeps track of the probability and whether a service-operation is using adaptive sampling -type samplingCacheEntry struct { - probability float64 - usingAdapative bool -} - -type samplingCache map[string]map[string]*samplingCacheEntry - -func (s samplingCache) Set(service, operation string, entry *samplingCacheEntry) { - if _, ok := s[service]; !ok { - s[service] = make(map[string]*samplingCacheEntry) - } - s[service][operation] = entry -} - -func (s samplingCache) Get(service, operation string) *samplingCacheEntry { - _, ok := s[service] - if !ok { - return nil - } - return s[service][operation] -} diff --git a/plugin/sampling/strategystore/adaptive/calculationstrategy/interface.go b/plugin/sampling/strategystore/adaptive/calculationstrategy/interface.go index 052ce79216b..dae7aced57e 100644 --- a/plugin/sampling/strategystore/adaptive/calculationstrategy/interface.go +++ b/plugin/sampling/strategystore/adaptive/calculationstrategy/interface.go @@ -1,3 +1,17 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package calculationstrategy // ProbabilityCalculator calculates the new probability given the current and target QPS diff --git a/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator.go b/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator.go index 7cc77eebd4f..3a8f73a2d3a 100644 --- a/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator.go +++ b/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator.go @@ -1,3 +1,17 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package calculationstrategy const ( diff --git a/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator_test.go b/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator_test.go index 6f4cd242e1f..5fc0aa0c210 100644 --- a/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator_test.go +++ b/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator_test.go @@ -1,3 +1,17 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package calculationstrategy import ( diff --git a/plugin/sampling/strategystore/adaptive/config.go b/plugin/sampling/strategystore/adaptive/config.go deleted file mode 100644 index 50b80e7119f..00000000000 --- a/plugin/sampling/strategystore/adaptive/config.go +++ /dev/null @@ -1,207 +0,0 @@ -package adaptive - -import ( - "time" - - "github.com/uber/jaeger-lib/metrics" - "go.uber.org/atomic" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/pkg/distributedlock" - "github.com/jaegertracing/jaeger/storage/samplingstore" -) - -const ( - defaultAggregationInterval = time.Minute - defaultTargetQPS = 1 - defaultEquivalenceThreshold = 0.3 - defaultLookbackQPSCount = 1 - defaultCalculationInterval = time.Minute - defaultLookbackInterval = time.Minute * 10 - defaultDelay = time.Minute * 2 - defaultSamplingProbability = 0.001 - defaultMinSamplingProbability = 0.00001 // once in 100 thousand requests - defaultLowerBoundTracesPerSecond = 1.0 / (1 * float64(time.Minute/time.Second)) // once every 1 minute - defaultLeaderLeaseRefreshInterval = 5 * time.Second - defaultFollowerLeaseRefreshInterval = 60 * time.Second -) - -// ThroughputAggregatorConfig is the configuration for the ThroughputAggregator. -type ThroughputAggregatorConfig struct { - // AggregationInterval determines how often throughput is aggregated and written to storage. - AggregationInterval time.Duration `yaml:"aggregation_interval"` -} - -// ProcessorConfig is the configuration for the SamplingProcessor. -type ProcessorConfig struct { - // TargetQPS is the target sampled qps for all operations. - TargetQPS float64 `yaml:"target_qps"` - - // QPSEquivalenceThreshold is the acceptable amount of deviation for the operation QPS from the `targetQPS`, - // ie. [targetQPS-equivalenceThreshold, targetQPS+equivalenceThreshold] is the acceptable targetQPS range. - // Increase this to reduce the amount of fluctuation in the probability calculation. - QPSEquivalenceThreshold float64 `yaml:"qps_equivalence_threshold"` - - // LookbackQPSCount determines how many previous operation QPS are used in calculating the weighted QPS, - // ie. if LookbackQPSCount is 1, the only the most recent QPS will be used in calculating the weighted QPS. - LookbackQPSCount int `yaml:"lookback_qps_count"` - - // CalculationInterval determines the interval each bucket represents, ie. if an interval is - // 1 minute, the bucket will contain 1 minute of throughput data for all services. - CalculationInterval time.Duration `yaml:"calculation_interval"` - - // LookbackInterval is the total amount of throughput data used to calculate probabilities. - LookbackInterval time.Duration `yaml:"lookback_interval"` - - // Delay is the amount of time to delay probability generation by, ie. if the calculationInterval - // is 1 minute, the number of buckets is 10, and the delay is 2 minutes, then at one time - // we'll have [now()-12,now()-2] range of throughput data in memory to base the calculations - // off of. - Delay time.Duration `yaml:"delay"` - - // DefaultSamplingProbability is the initial sampling probability for all new operations. - DefaultSamplingProbability float64 `yaml:"default_sampling_probability"` - - // MinSamplingProbability is the minimum sampling probability for all operations. ie. the calculated sampling - // probability will be bound [MinSamplingProbability, 1.0] - MinSamplingProbability float64 `yaml:"min_sampling_probability"` - - // LowerBoundTracesPerSecond determines the lower bound number of traces that are sampled per second. - // For example, if the value is 0.01666666666 (one every minute), then the sampling processor will do - // its best to sample at least one trace a minute for an operation. This is useful for a low QPS operation - // that is never sampled by the probabilistic sampler and depends on some time based element. - LowerBoundTracesPerSecond float64 `yaml:"lower_bound_traces_per_second"` - - // LeaderLeaseRefreshInterval is the duration to sleep if this processor is elected leader before - // attempting to renew the lease on the leader lock. NB. This should be less than FollowerLeaseRefreshInterval - // to reduce lock thrashing. - LeaderLeaseRefreshInterval time.Duration `yaml:"leader_lease_refresh_interval"` - - // FollowerLeaseRefreshInterval is the duration to sleep if this processor is a follower - // (ie. failed to gain the leader lock). - FollowerLeaseRefreshInterval time.Duration `yaml:"follower_lease_refresh_interval"` - - // Mutable is a configuration holder that holds configurations that could dynamically change during - // the lifetime of the processor. - Mutable MutableProcessorConfigurator -} - -// MutableProcessorConfigurator is a mutable config holder for certain configs that can change during the lifetime -// of the processor. -type MutableProcessorConfigurator interface { - GetTargetQPS() float64 - GetQPSEquivalenceThreshold() float64 -} - -// ImmutableProcessorConfig is a MutableProcessorConfigurator that doesn't dynamically update (it can be updated, but -// doesn't guarantee thread safety). -type ImmutableProcessorConfig struct { - TargetQPS float64 `json:"target_qps"` - QPSEquivalenceThreshold float64 `json:"qps_equivalence_threshold"` -} - -// GetTargetQPS implements MutableProcessorConfigurator#GetTargetQPS -func (d ImmutableProcessorConfig) GetTargetQPS() float64 { - return d.TargetQPS -} - -// GetQPSEquivalenceThreshold implements MutableProcessorConfigurator#GetQPSEquivalenceThreshold -func (d ImmutableProcessorConfig) GetQPSEquivalenceThreshold() float64 { - return d.QPSEquivalenceThreshold -} - -// MutableProcessorConfig is a MutableProcessorConfigurator that is thread safe and dynamically updates. -type MutableProcessorConfig struct { - targetQPS *atomic.Float64 - qpsEquivalenceThreshold *atomic.Float64 -} - -// NewMutableProcessorConfig returns a MutableProcessorConfigurator that dynamically updates. -func NewMutableProcessorConfig(config ImmutableProcessorConfig) *MutableProcessorConfig { - return &MutableProcessorConfig{ - targetQPS: atomic.NewFloat64(config.GetTargetQPS()), - qpsEquivalenceThreshold: atomic.NewFloat64(config.GetQPSEquivalenceThreshold()), - } -} - -// Update updates the configs. -func (d *MutableProcessorConfig) Update(config ImmutableProcessorConfig) { - d.targetQPS.Store(config.GetTargetQPS()) - d.qpsEquivalenceThreshold.Store(config.GetQPSEquivalenceThreshold()) -} - -// GetTargetQPS implements MutableProcessorConfigurator#GetTargetQPS. -func (d *MutableProcessorConfig) GetTargetQPS() float64 { - return d.targetQPS.Load() -} - -// GetQPSEquivalenceThreshold implements MutableProcessorConfigurator#GetQPSEquivalenceThreshold. -func (d *MutableProcessorConfig) GetQPSEquivalenceThreshold() float64 { - return d.qpsEquivalenceThreshold.Load() -} - -// Builder struct to hold configurations. -type Builder struct { - ThroughputAggregator ThroughputAggregatorConfig `yaml:"throughput_aggregator"` - SamplingProcessor ProcessorConfig `yaml:"sampling_processor"` - - metrics metrics.Factory - logger *zap.Logger -} - -// NewBuilder creates a default builder. -func NewBuilder() *Builder { - return &Builder{ - ThroughputAggregator: ThroughputAggregatorConfig{ - AggregationInterval: defaultAggregationInterval, - }, - SamplingProcessor: ProcessorConfig{ - LookbackQPSCount: defaultLookbackQPSCount, - CalculationInterval: defaultCalculationInterval, - LookbackInterval: defaultLookbackInterval, - Delay: defaultDelay, - DefaultSamplingProbability: defaultSamplingProbability, - MinSamplingProbability: defaultMinSamplingProbability, - LowerBoundTracesPerSecond: defaultLowerBoundTracesPerSecond, - LeaderLeaseRefreshInterval: defaultLeaderLeaseRefreshInterval, - FollowerLeaseRefreshInterval: defaultFollowerLeaseRefreshInterval, - Mutable: ImmutableProcessorConfig{ - TargetQPS: defaultTargetQPS, - QPSEquivalenceThreshold: defaultEquivalenceThreshold, - }, - }, - } -} - -// WithMetricsFactory sets metrics factory. -func (b *Builder) WithMetricsFactory(m metrics.Factory) *Builder { - b.metrics = m - return b -} - -// WithLogger sets logger. -func (b *Builder) WithLogger(l *zap.Logger) *Builder { - b.logger = l - return b -} - -func (b *Builder) applyDefaults() { - if b.metrics == nil { - b.metrics = metrics.NullFactory - } - if b.logger == nil { - b.logger = zap.NewNop() - } -} - -// NewThroughputAggregator creates and returns a ThroughputAggregator. -func (b *Builder) NewThroughputAggregator(storage samplingstore.Store) (Aggregator, error) { - b.applyDefaults() - return NewAggregator(b.metrics, b.ThroughputAggregator.AggregationInterval, storage), nil -} - -// NewProcessor creates and returns a SamplingProcessor. -func (b *Builder) NewProcessor(hostname string, storage samplingstore.Store, lock distributedlock.Lock) (Processor, error) { - b.applyDefaults() - return NewProcessor(b.SamplingProcessor, hostname, storage, lock, b.metrics, b.logger) -} diff --git a/plugin/sampling/strategystore/adaptive/internal/leader_election.go b/plugin/sampling/strategystore/adaptive/internal/leader_election.go new file mode 100644 index 00000000000..fe3c93df122 --- /dev/null +++ b/plugin/sampling/strategystore/adaptive/internal/leader_election.go @@ -0,0 +1,117 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "sync" + "time" + + "go.uber.org/atomic" + "go.uber.org/zap" + + dl "github.com/jaegertracing/jaeger/pkg/distributedlock" +) + +var ( + acquireLockErrMsg = "Failed to acquire lock" +) + +// ElectionParticipant partakes in leader election to become leader. +type ElectionParticipant interface { + IsLeader() bool + Start() +} + +type electionParticipant struct { + ElectionParticipantOptions + lock dl.Lock + isLeader *atomic.Bool + resourceName string + closeChan chan struct{} + wg sync.WaitGroup +} + +// ElectionParticipantOptions control behavior of the election participant. TODO func applyDefaults(), parameter error checking, etc. +type ElectionParticipantOptions struct { + LeaderLeaseRefreshInterval time.Duration + FollowerLeaseRefreshInterval time.Duration + Logger *zap.Logger +} + +// NewElectionParticipant returns a ElectionParticipant which attempts to become leader. +func NewElectionParticipant(lock dl.Lock, resourceName string, options ElectionParticipantOptions) ElectionParticipant { + return &electionParticipant{ + ElectionParticipantOptions: options, + lock: lock, + resourceName: resourceName, + isLeader: atomic.NewBool(false), + closeChan: make(chan struct{}), + } +} + +// Start runs a background thread which attempts to acquire the leader lock. +func (p *electionParticipant) Start() { + p.wg.Add(1) + go p.runAcquireLockLoop() +} + +// Close implements io.Closer. +func (p *electionParticipant) Close() error { + close(p.closeChan) + p.wg.Wait() + return nil +} + +// IsLeader returns true if this process is the leader. +func (p *electionParticipant) IsLeader() bool { + return p.isLeader.Load() +} + +// runAcquireLockLoop attempts to acquire the leader lock. If it succeeds, it will attempt to retain it, +// otherwise it sleeps and attempts to gain the lock again. +func (p *electionParticipant) runAcquireLockLoop() { + defer p.wg.Done() + ticker := time.NewTicker(p.acquireLock()) + for { + select { + case <-ticker.C: + ticker.Stop() + ticker = time.NewTicker(p.acquireLock()) + case <-p.closeChan: + ticker.Stop() + return + } + } +} + +// acquireLock attempts to acquire the lock and returns the interval to sleep before the next retry. +func (p *electionParticipant) acquireLock() time.Duration { + if acquiredLeaderLock, err := p.lock.Acquire(p.resourceName, p.FollowerLeaseRefreshInterval); err == nil { + p.setLeader(acquiredLeaderLock) + } else { + p.Logger.Error(acquireLockErrMsg, zap.Error(err)) + } + if p.IsLeader() { + // If this process holds the leader lock, retry with a shorter cadence + // to retain the leader lease. + return p.LeaderLeaseRefreshInterval + } + // If this process failed to acquire the leader lock, retry with a longer cadence + return p.FollowerLeaseRefreshInterval +} + +func (p *electionParticipant) setLeader(isLeader bool) { + p.isLeader.Store(isLeader) +} diff --git a/plugin/sampling/strategystore/adaptive/internal/leader_election_test.go b/plugin/sampling/strategystore/adaptive/internal/leader_election_test.go new file mode 100644 index 00000000000..b5224662f7f --- /dev/null +++ b/plugin/sampling/strategystore/adaptive/internal/leader_election_test.go @@ -0,0 +1,107 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package internal + +import ( + "errors" + "fmt" + "io" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "go.uber.org/atomic" + + lmocks "github.com/jaegertracing/jaeger/pkg/distributedlock/mocks" + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +var ( + errTestLock = errors.New("Lock error") +) + +var _ io.Closer = &electionParticipant{} + +func TestAcquireLock(t *testing.T) { + leaderInterval := time.Millisecond + followerInterval := 5 * time.Millisecond + tests := []struct { + isLeader bool + acquiredLock bool + err error + expectedInterval time.Duration + expectedError bool + }{ + {isLeader: true, acquiredLock: true, err: nil, expectedInterval: leaderInterval, expectedError: false}, + {isLeader: true, acquiredLock: false, err: errTestLock, expectedInterval: leaderInterval, expectedError: true}, + {isLeader: true, acquiredLock: false, err: nil, expectedInterval: followerInterval, expectedError: false}, + {isLeader: false, acquiredLock: false, err: nil, expectedInterval: followerInterval, expectedError: false}, + {isLeader: false, acquiredLock: false, err: errTestLock, expectedInterval: followerInterval, expectedError: true}, + {isLeader: false, acquiredLock: true, err: nil, expectedInterval: leaderInterval, expectedError: false}, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + logger, logBuffer := testutils.NewLogger() + mockLock := &lmocks.Lock{} + mockLock.On("Acquire", "sampling_lock", mock.AnythingOfType("time.Duration")).Return(test.acquiredLock, test.err) + + p := &electionParticipant{ + ElectionParticipantOptions: ElectionParticipantOptions{ + LeaderLeaseRefreshInterval: leaderInterval, + FollowerLeaseRefreshInterval: followerInterval, + Logger: logger, + }, + lock: mockLock, + resourceName: "sampling_lock", + isLeader: atomic.NewBool(false), + } + + p.setLeader(test.isLeader) + assert.Equal(t, test.expectedInterval, p.acquireLock()) + assert.Equal(t, test.expectedError, testutils.LogMatcher(1, acquireLockErrMsg, logBuffer.Lines())) + }) + } +} + +func TestRunAcquireLockLoop_followerOnly(t *testing.T) { + logger, logBuffer := testutils.NewLogger() + mockLock := &lmocks.Lock{} + mockLock.On("Acquire", "sampling_lock", mock.AnythingOfType("time.Duration")).Return(false, errTestLock) + + p := NewElectionParticipant(mockLock, "sampling_lock", ElectionParticipantOptions{ + LeaderLeaseRefreshInterval: time.Millisecond, + FollowerLeaseRefreshInterval: 5 * time.Millisecond, + Logger: logger, + }, + ) + + defer func() { + assert.NoError(t, p.(io.Closer).Close()) + }() + go p.Start() + + expectedErrorMsg := "Failed to acquire lock" + for i := 0; i < 1000; i++ { + // match logs specific to acquireLockErrMsg. + if testutils.LogMatcher(2, expectedErrorMsg, logBuffer.Lines()) { + break + } + time.Sleep(time.Millisecond) + } + assert.True(t, testutils.LogMatcher(2, expectedErrorMsg, logBuffer.Lines())) + assert.False(t, p.IsLeader()) +} diff --git a/plugin/sampling/strategystore/adaptive/internal/mocks/ElectionParticipant.go b/plugin/sampling/strategystore/adaptive/internal/mocks/ElectionParticipant.go new file mode 100644 index 00000000000..1dadb25289e --- /dev/null +++ b/plugin/sampling/strategystore/adaptive/internal/mocks/ElectionParticipant.go @@ -0,0 +1,37 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocks + +import "github.com/stretchr/testify/mock" + +type ElectionParticipant struct { + mock.Mock +} + +func (_m *ElectionParticipant) IsLeader() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} +func (_m *ElectionParticipant) Start() { + _m.Called() +} diff --git a/plugin/sampling/strategystore/adaptive/processor.go b/plugin/sampling/strategystore/adaptive/processor.go deleted file mode 100644 index b2b254f4c36..00000000000 --- a/plugin/sampling/strategystore/adaptive/processor.go +++ /dev/null @@ -1,553 +0,0 @@ -package adaptive - -import ( - "errors" - "math" - "math/rand" - "sync" - "time" - - "github.com/jaegertracing/jaeger/thrift-gen/sampling" - "github.com/uber-go/atomic" - "github.com/uber/jaeger-lib/metrics" - "go.uber.org/zap" - - "github.com/jaegertracing/jaeger/pkg/distributedlock" - "github.com/jaegertracing/jaeger/plugin/sampling/strategystore/adaptive/calculationstrategy" - "github.com/jaegertracing/jaeger/storage/samplingstore" -) - -const ( - maxSamplingProbability = 1.0 - - samplingLock = "sampling_lock" - - getThroughputErrMsg = "Failed to get throughput from storage" - acquireLockErrMsg = "Failed to acquire lock" - - defaultFollowerProbabilityInterval = 20 * time.Second - - // The number of past entries for samplingCache the leader keeps in memory - serviceCacheSize = 25 -) - -var ( - errIntervals = errors.New("CalculationInterval must be less than LookbackInterval") - errNonZeroIntervals = errors.New("CalculationInterval and LookbackInterval must be greater than 0") - errLockIntervals = errors.New("FollowerLeaseRefreshInterval cannot be less than LeaderLeaseRefreshInterval") - errLookbackQPSCount = errors.New("LookbackQPSCount cannot be less than 1") -) - -type serviceOperationThroughput map[string]map[string]*samplingstore.Throughput - -type throughputBucket struct { - throughput serviceOperationThroughput - interval time.Duration - endTime time.Time -} - -// Processor retrieves service throughput over a look back interval and calculates sampling probabilities -// per operation such that each operation is sampled at a specified target QPS. It achieves this by -// retrieving discrete buckets of operation throughput and doing a weighted average of the throughput -// and generating a probability to match the targetQPS. -type Processor interface { - // GetSamplingStrategyResponses returns the sampling strategy response probabilities of a service. - GetSamplingStrategyResponses(service string) sampling.SamplingStrategyResponse - - // Start initializes and starts the sampling processor which regularly calculates sampling probabilities. - Start() - - // Stop stops the processor from calculating probabilities. - Stop() -} - -type processor struct { - sync.RWMutex - ProcessorConfig - - // flag used to determine if this processor has the leader lock. - atomic.Bool - - storage samplingstore.Store - lock distributedlock.Lock - acquireLockStop chan struct{} - calculationStop chan struct{} - updateProbabilitiesStop chan struct{} - hostname string - - // buckets is the number of `calculationInterval` buckets used to calculate the probabilities. - // It is calculated as lookbackInterval / calculationInterval. - buckets int - - // probabilities contains the latest calculated sampling probabilities for service operations. - probabilities samplingstore.ServiceOperationProbabilities - - // qps contains the latest calculated qps for service operations; the calculation is essentially - // throughput / CalculationInterval. - qps samplingstore.ServiceOperationQPS - - // throughputs slice of `buckets` size that stores the aggregated throughput. The latest throughput - // is stored at the head of the slice. - throughputs []*throughputBucket - - // strategyResponses contains the sampling strategies for every service. - strategyResponses map[string]*sampling.SamplingStrategyResponse - - logger *zap.Logger - - weightsCache *weightsCache - - probabilityCalculator calculationstrategy.ProbabilityCalculator - - // followerProbabilityInterval determines how often the follower processor updates its probabilities. - // Given only the leader writes probabilities, the followers need to fetch the probabilities into - // cache. - followerProbabilityInterval time.Duration - - serviceCache []samplingCache - - operationsCalculatedGauge metrics.Gauge - calculateProbabilitiesLatency metrics.Timer -} - -// NewProcessor creates a new sampling processor that generates sampling rates for service operations -func NewProcessor( - config ProcessorConfig, - hostname string, - storage samplingstore.Store, - lock distributedlock.Lock, - metricsFactory metrics.Factory, - logger *zap.Logger, -) (Processor, error) { - if config.LookbackInterval < config.CalculationInterval { - return nil, errIntervals - } - if config.CalculationInterval == 0 || config.LookbackInterval == 0 { - return nil, errNonZeroIntervals - } - if config.FollowerLeaseRefreshInterval < config.LeaderLeaseRefreshInterval { - return nil, errLockIntervals - } - if config.LookbackQPSCount < 1 { - return nil, errLookbackQPSCount - } - buckets := int(config.LookbackInterval / config.CalculationInterval) - metricsFactory = metricsFactory.Namespace("adaptive_sampling_processor", nil) - return &processor{ - ProcessorConfig: config, - storage: storage, - buckets: buckets, - probabilities: make(samplingstore.ServiceOperationProbabilities), - qps: make(samplingstore.ServiceOperationQPS), - hostname: hostname, - strategyResponses: make(map[string]*sampling.SamplingStrategyResponse), - logger: logger, - lock: lock, - // TODO make weightsCache and probabilityCalculator configurable - weightsCache: newWeightsCache(), - probabilityCalculator: calculationstrategy.NewPercentageIncreaseCappedCalculator(1.0), - followerProbabilityInterval: defaultFollowerProbabilityInterval, - serviceCache: []samplingCache{}, - operationsCalculatedGauge: metricsFactory.Gauge("operations_calculated", nil), - calculateProbabilitiesLatency: metricsFactory.Timer("calculate_probabilities", nil), - }, nil -} - -func (p *processor) GetSamplingStrategyResponses(service string) sampling.SamplingStrategyResponse { - p.RLock() - defer p.RUnlock() - if strategy, ok := p.strategyResponses[service]; ok { - return *strategy - } - return p.generateDefaultSamplingStrategyResponse() -} - -func (p *processor) Start() { - p.logger.Info("Starting sampling processor") - p.acquireLockStop = make(chan struct{}) - p.calculationStop = make(chan struct{}) - p.updateProbabilitiesStop = make(chan struct{}) - p.setLeader(false) - p.loadProbabilities() - p.generateStrategyResponses() - go p.runAcquireLockLoop() - go p.runCalculationLoop() - go p.runUpdateProbabilitiesLoop() -} - -func (p *processor) Stop() { - p.logger.Info("Stopping sampling processor") - close(p.acquireLockStop) - close(p.calculationStop) - close(p.updateProbabilitiesStop) -} - -func (p *processor) loadProbabilities() { - // TODO GetLatestProbabilities API can be changed to return the latest measured qps for initialization - probabilities, err := p.storage.GetLatestProbabilities() - if err != nil { - p.logger.Warn("Failed to initialize probabilities", zap.Error(err)) - return - } - p.Lock() - defer p.Unlock() - p.probabilities = probabilities -} - -// runAcquireLockLoop attempts to acquire the leader lock. If it succeeds, it will attempt to retain it, -// otherwise it sleeps and attempts to gain the lock again. -func (p *processor) runAcquireLockLoop() { - addJitter(p.LeaderLeaseRefreshInterval) - ticker := time.NewTicker(p.acquireLock()) - for { - select { - case <-ticker.C: - ticker.Stop() - ticker = time.NewTicker(p.acquireLock()) - case <-p.acquireLockStop: - ticker.Stop() - return - } - } -} - -// acquireLock attempts to acquire the lock and returns the interval to sleep before the next retry. -func (p *processor) acquireLock() time.Duration { - if acquiredLeaderLock, err := p.lock.Acquire(samplingLock); err == nil { - p.setLeader(acquiredLeaderLock) - } else { - p.logger.Error(acquireLockErrMsg, zap.Error(err)) - } - if p.isLeader() { - // If this host holds the leader lock, retry with a shorter cadence - // to retain the leader lease. - return p.LeaderLeaseRefreshInterval - } - // If this host failed to acquire the leader lock, retry with a longer cadence - return p.FollowerLeaseRefreshInterval -} - -// runUpdateProbabilitiesLoop starts a loop that reads probabilities from storage. -// The follower updates its local cache with the latest probabilities and serves them. -func (p *processor) runUpdateProbabilitiesLoop() { - addJitter(p.followerProbabilityInterval) - ticker := time.NewTicker(p.followerProbabilityInterval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - // Only load probabilities if this processor doesn't hold the leader lock - if !p.isLeader() { - p.loadProbabilities() - p.generateStrategyResponses() - } - case <-p.updateProbabilitiesStop: - return - } - } -} - -func (p *processor) isLeader() bool { - return p.Load() -} - -func (p *processor) setLeader(isLeader bool) { - p.Store(isLeader) -} - -// addJitter sleeps for a random amount of time. Without jitter, if the host holding the leader -// lock were to die, then all other collectors can potentially wait for a full cycle before -// trying to acquire the lock. With jitter, we can reduce the average amount of time before a -// new leader is elected. Furthermore, jitter can be used to spread out read load on storage. -func addJitter(jitterAmount time.Duration) { - randomTime := (jitterAmount / 2) + time.Duration(rand.Int63n(int64(jitterAmount/2))) - time.Sleep(randomTime) -} - -func (p *processor) runCalculationLoop() { - lastCheckedTime := time.Now().Add(p.Delay * -1) - p.initializeThroughput(lastCheckedTime) - // NB: the first tick will be slightly delayed by the initializeThroughput call. - ticker := time.NewTicker(p.CalculationInterval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - endTime := time.Now().Add(p.Delay * -1) - startTime := lastCheckedTime - throughput, err := p.storage.GetThroughput(startTime, endTime) - if err != nil { - p.logger.Error(getThroughputErrMsg, zap.Error(err)) - break - } - aggregatedThroughput := p.aggregateThroughput(throughput) - p.prependThroughputBucket(&throughputBucket{ - throughput: aggregatedThroughput, - interval: endTime.Sub(startTime), - endTime: endTime, - }) - lastCheckedTime = endTime - // Load the latest throughput so that if this host ever becomes leader, it - // has the throughput ready in memory. However, only run the actual calculations - // if this host becomes leader. - // TODO fill the throughput buffer only when we're leader - if p.isLeader() { - startTime := time.Now() - probabilities, qps := p.calculateProbabilitiesAndQPS() - p.Lock() - p.probabilities = probabilities - p.qps = qps - p.Unlock() - // NB: This has the potential of running into a race condition if the calculationInterval - // is set to an extremely low value. The worst case scenario is that probabilities is calculated - // and swapped more than once before generateStrategyResponses() and saveProbabilities() are called. - // This will result in one or more batches of probabilities not being saved which is completely - // fine. This race condition should not ever occur anyway since the calculation interval will - // be way longer than the time to run the calculations. - p.generateStrategyResponses() - p.calculateProbabilitiesLatency.Record(time.Now().Sub(startTime)) - go p.saveProbabilitiesAndQPS() - } - case <-p.calculationStop: - return - } - } -} - -func (p *processor) saveProbabilitiesAndQPS() { - p.RLock() - defer p.RUnlock() - if err := p.storage.InsertProbabilitiesAndQPS(p.hostname, p.probabilities, p.qps); err != nil { - p.logger.Warn("Could not save probabilities", zap.Error(err)) - } -} - -func (p *processor) prependThroughputBucket(bucket *throughputBucket) { - p.throughputs = append([]*throughputBucket{bucket}, p.throughputs...) - if len(p.throughputs) > p.buckets { - p.throughputs = p.throughputs[0:p.buckets] - } -} - -// aggregateThroughput aggregates operation throughput from different buckets into one. -func (p *processor) aggregateThroughput(throughputs []*samplingstore.Throughput) serviceOperationThroughput { - aggregatedThroughput := make(serviceOperationThroughput) - for _, throughput := range throughputs { - service := throughput.Service - operation := throughput.Operation - if _, ok := aggregatedThroughput[service]; !ok { - aggregatedThroughput[service] = make(map[string]*samplingstore.Throughput) - } - if t, ok := aggregatedThroughput[service][operation]; ok { - t.Count += throughput.Count - t.Probabilities = combineProbabilities(t.Probabilities, throughput.Probabilities) - } else { - aggregatedThroughput[service][operation] = throughput - } - } - return aggregatedThroughput -} - -func (p *processor) initializeThroughput(endTime time.Time) { - for i := 0; i < p.buckets; i++ { - startTime := endTime.Add(p.CalculationInterval * -1) - throughput, err := p.storage.GetThroughput(startTime, endTime) - if err != nil && p.logger != nil { - p.logger.Error(getThroughputErrMsg, zap.Error(err)) - return - } - if len(throughput) == 0 { - return - } - aggregatedThroughput := p.aggregateThroughput(throughput) - p.throughputs = append(p.throughputs, &throughputBucket{ - throughput: aggregatedThroughput, - interval: p.CalculationInterval, - endTime: endTime, - }) - endTime = startTime - } -} - -type serviceOperationQPS map[string]map[string][]float64 - -func (p *processor) generateOperationQPS() serviceOperationQPS { - // TODO previous qps buckets have already been calculated, just need to calculate latest batch and append them - // where necessary and throw out the oldest batch. Edge case #buckets < p.buckets, then we shouldn't throw out - qps := make(serviceOperationQPS) - for _, bucket := range p.throughputs { - for svc, operations := range bucket.throughput { - if _, ok := qps[svc]; !ok { - qps[svc] = make(map[string][]float64) - } - for op, throughput := range operations { - if len(qps[svc][op]) >= p.LookbackQPSCount { - continue - } - qps[svc][op] = append(qps[svc][op], calculateQPS(throughput.Count, bucket.interval)) - } - } - } - return qps -} - -func calculateQPS(count int64, interval time.Duration) float64 { - seconds := float64(interval) / float64(time.Second) - return float64(count) / seconds -} - -// calculateWeightedQPS calculates the weighted qps of the slice allQPS where weights are biased towards more recent -// qps. This function assumes that the most recent qps is at the head of the slice. -func (p *processor) calculateWeightedQPS(allQPS []float64) float64 { - if len(allQPS) == 0 { - return 0 - } - weights := p.weightsCache.getWeights(len(allQPS)) - var qps float64 - for i := 0; i < len(allQPS); i++ { - qps += allQPS[i] * weights[i] - } - return qps -} - -func (p *processor) prependServiceCache() { - p.serviceCache = append([]samplingCache{make(samplingCache)}, p.serviceCache...) - if len(p.serviceCache) > serviceCacheSize { - p.serviceCache = p.serviceCache[0:serviceCacheSize] - } -} - -func (p *processor) calculateProbabilitiesAndQPS() (samplingstore.ServiceOperationProbabilities, samplingstore.ServiceOperationQPS) { - p.prependServiceCache() - retProbabilities := make(samplingstore.ServiceOperationProbabilities) - retQPS := make(samplingstore.ServiceOperationQPS) - svcOpQPS := p.generateOperationQPS() - totalOperations := int64(0) - for svc, opQPS := range svcOpQPS { - if _, ok := retProbabilities[svc]; !ok { - retProbabilities[svc] = make(map[string]float64) - } - if _, ok := retQPS[svc]; !ok { - retQPS[svc] = make(map[string]float64) - } - for op, qps := range opQPS { - totalOperations++ - avgQPS := p.calculateWeightedQPS(qps) - retQPS[svc][op] = avgQPS - retProbabilities[svc][op] = p.calculateProbability(svc, op, avgQPS) - } - } - p.operationsCalculatedGauge.Update(totalOperations) - return retProbabilities, retQPS -} - -func (p *processor) calculateProbability(service, operation string, qps float64) float64 { - oldProbability := p.DefaultSamplingProbability - // TODO: is this loop overly expensive? - p.RLock() - if opProbabilities, ok := p.probabilities[service]; ok { - if probability, ok := opProbabilities[operation]; ok { - oldProbability = probability - } - } - latestThroughput := p.throughputs[0].throughput - p.RUnlock() - - usingAdaptiveSampling := p.usingAdaptiveSampling(oldProbability, service, operation, latestThroughput) - p.serviceCache[0].Set(service, operation, &samplingCacheEntry{ - probability: oldProbability, - usingAdapative: usingAdaptiveSampling, - }) - - // Short circuit if the qps is close enough to targetQPS or if the service isn't using - // adaptive sampling. - targetQPS := p.Mutable.GetTargetQPS() - if math.Abs(qps-targetQPS) < p.Mutable.GetQPSEquivalenceThreshold() || !usingAdaptiveSampling { - return oldProbability - } - var newProbability float64 - if floatEquals(qps, 0) { - // Edge case, we double the sampling probability if the QPS is 0 so that we force the service - // to at least sample one span probabilistically - newProbability = oldProbability * 2.0 - } else { - newProbability = p.probabilityCalculator.Calculate(targetQPS, qps, oldProbability) - } - return math.Min(maxSamplingProbability, math.Max(p.MinSamplingProbability, newProbability)) -} - -func combineProbabilities(p1 map[string]struct{}, p2 map[string]struct{}) map[string]struct{} { - for probability := range p2 { - p1[probability] = struct{}{} - } - return p1 -} - -func (p *processor) usingAdaptiveSampling(probability float64, service, operation string, throughput serviceOperationThroughput) bool { - if floatEquals(probability, p.DefaultSamplingProbability) { - // If the service is seen for the first time, assume it's using adaptive sampling (ie prob == defaultProb). - // Even if this isn't the case, the next time around this loop, the newly calculated probability will not equal - // the defaultProb so the logic will fall through. - return true - } - var opThroughput *samplingstore.Throughput - svcThroughput, ok := throughput[service] - if ok { - opThroughput = svcThroughput[operation] - } - if opThroughput != nil { - f := truncateFloat(probability) - _, ok = opThroughput.Probabilities[f] - return ok - } - // By this point, we know that there's no recorded throughput for this operation for this round - // of calculation. Check the previous bucket to see if this operation was using adaptive sampling - // before. - if len(p.serviceCache) > 1 { - if e := p.serviceCache[1].Get(service, operation); e != nil { - return e.usingAdapative && e.probability != p.DefaultSamplingProbability - } - } - return false -} - -// generateStrategyResponses generates a SamplingStrategyResponse from the calculated sampling probabilities. -func (p *processor) generateStrategyResponses() { - p.RLock() - strategies := make(map[string]*sampling.SamplingStrategyResponse) - for svc, opProbabilities := range p.probabilities { - opStrategies := make([]*sampling.OperationSamplingStrategy, len(opProbabilities)) - var idx int - for op, probability := range opProbabilities { - opStrategies[idx] = &sampling.OperationSamplingStrategy{ - Operation: op, - ProbabilisticSampling: &sampling.ProbabilisticSamplingStrategy{ - SamplingRate: probability, - }, - } - idx++ - } - strategy := p.generateDefaultSamplingStrategyResponse() - strategy.OperationSampling.PerOperationStrategies = opStrategies - strategies[svc] = &strategy - } - p.RUnlock() - - p.Lock() - defer p.Unlock() - p.strategyResponses = strategies -} - -func (p *processor) generateDefaultSamplingStrategyResponse() sampling.SamplingStrategyResponse { - return sampling.SamplingStrategyResponse{ - StrategyType: sampling.SamplingStrategyType_PROBABILISTIC, - OperationSampling: &sampling.PerOperationSamplingStrategies{ - DefaultSamplingProbability: p.DefaultSamplingProbability, - DefaultLowerBoundTracesPerSecond: p.LowerBoundTracesPerSecond, - }, - } -} - -func floatEquals(a, b float64) bool { - return math.Abs(a-b) < 0.0000000001 -} From 3c2fe4f6072184ad225d802c86dfa8a295d7969e Mon Sep 17 00:00:00 2001 From: Won Jun Jang Date: Tue, 7 Aug 2018 10:52:11 -0400 Subject: [PATCH 3/7] increase coverage Signed-off-by: Won Jun Jang --- .../calculationstrategy/interface_test.go | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) create mode 100644 plugin/sampling/strategystore/adaptive/calculationstrategy/interface_test.go diff --git a/plugin/sampling/strategystore/adaptive/calculationstrategy/interface_test.go b/plugin/sampling/strategystore/adaptive/calculationstrategy/interface_test.go new file mode 100644 index 00000000000..bdbd25c20ce --- /dev/null +++ b/plugin/sampling/strategystore/adaptive/calculationstrategy/interface_test.go @@ -0,0 +1,29 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package calculationstrategy + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCalculate(t *testing.T) { + c := Func(func(targetQPS, qps, oldProbability float64) float64 { + return targetQPS + }) + val := 1.0 + assert.Equal(t, val, c.Calculate(val, 0, 0)) +} From 4605109691352e7ee485379392bb44c223164f51 Mon Sep 17 00:00:00 2001 From: Won Jun Jang Date: Tue, 7 Aug 2018 11:13:33 -0400 Subject: [PATCH 4/7] rename test Signed-off-by: Won Jun Jang --- .../adaptive/calculationstrategy/interface_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/sampling/strategystore/adaptive/calculationstrategy/interface_test.go b/plugin/sampling/strategystore/adaptive/calculationstrategy/interface_test.go index bdbd25c20ce..9ddb8c4163e 100644 --- a/plugin/sampling/strategystore/adaptive/calculationstrategy/interface_test.go +++ b/plugin/sampling/strategystore/adaptive/calculationstrategy/interface_test.go @@ -20,7 +20,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestCalculate(t *testing.T) { +func TestFunc(t *testing.T) { c := Func(func(targetQPS, qps, oldProbability float64) float64 { return targetQPS }) From 0b4a788cc0f194067b790c61b67a30611684292b Mon Sep 17 00:00:00 2001 From: Won Jun Jang Date: Tue, 13 Nov 2018 07:56:11 +0800 Subject: [PATCH 5/7] address comments Signed-off-by: Won Jun Jang --- pkg/io/starter.go | 20 +++++++++++++++++++ .../calculationstrategy/interface.go | 8 ++++---- .../calculationstrategy/interface_test.go | 4 ++-- .../percentage_increase_capped_calculator.go | 0 ...centage_increase_capped_calculator_test.go | 2 +- .../leaderelection}/leader_election.go | 14 ++++++------- .../leaderelection}/leader_election_test.go | 9 +++++---- .../mocks/ElectionParticipant.go | 0 8 files changed, 39 insertions(+), 18 deletions(-) create mode 100644 pkg/io/starter.go rename plugin/sampling/{strategystore/adaptive => internal}/calculationstrategy/interface.go (73%) rename plugin/sampling/{strategystore/adaptive => internal}/calculationstrategy/interface_test.go (87%) rename plugin/sampling/{strategystore/adaptive => internal}/calculationstrategy/percentage_increase_capped_calculator.go (100%) rename plugin/sampling/{strategystore/adaptive => internal}/calculationstrategy/percentage_increase_capped_calculator_test.go (95%) rename plugin/sampling/{strategystore/adaptive/internal => internal/leaderelection}/leader_election.go (92%) rename plugin/sampling/{strategystore/adaptive/internal => internal/leaderelection}/leader_election_test.go (95%) rename plugin/sampling/{strategystore/adaptive/internal => internal/leaderelection}/mocks/ElectionParticipant.go (100%) diff --git a/pkg/io/starter.go b/pkg/io/starter.go new file mode 100644 index 00000000000..2c3e88b79f0 --- /dev/null +++ b/pkg/io/starter.go @@ -0,0 +1,20 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io + +// Starter is the interface that wraps the basic Start() method. +type Starter interface { + Start() error +} diff --git a/plugin/sampling/strategystore/adaptive/calculationstrategy/interface.go b/plugin/sampling/internal/calculationstrategy/interface.go similarity index 73% rename from plugin/sampling/strategystore/adaptive/calculationstrategy/interface.go rename to plugin/sampling/internal/calculationstrategy/interface.go index dae7aced57e..2fcc018685e 100644 --- a/plugin/sampling/strategystore/adaptive/calculationstrategy/interface.go +++ b/plugin/sampling/internal/calculationstrategy/interface.go @@ -19,10 +19,10 @@ type ProbabilityCalculator interface { Calculate(targetQPS, curQPS, prevProbability float64) (newProbability float64) } -// Func wraps a function of appropriate signature and makes a ProbabilityCalculator from it. -type Func func(targetQPS, curQPS, prevProbability float64) (newProbability float64) +// Calculate wraps a function of appropriate signature and makes a ProbabilityCalculator from it. +type Calculate func(targetQPS, curQPS, prevProbability float64) (newProbability float64) // Calculate implements Calculator interface. -func (f Func) Calculate(targetQPS, curQPS, prevProbability float64) float64 { - return f(targetQPS, curQPS, prevProbability) +func (c Calculate) Calculate(targetQPS, curQPS, prevProbability float64) float64 { + return c(targetQPS, curQPS, prevProbability) } diff --git a/plugin/sampling/strategystore/adaptive/calculationstrategy/interface_test.go b/plugin/sampling/internal/calculationstrategy/interface_test.go similarity index 87% rename from plugin/sampling/strategystore/adaptive/calculationstrategy/interface_test.go rename to plugin/sampling/internal/calculationstrategy/interface_test.go index 9ddb8c4163e..fd0a4c54e8e 100644 --- a/plugin/sampling/strategystore/adaptive/calculationstrategy/interface_test.go +++ b/plugin/sampling/internal/calculationstrategy/interface_test.go @@ -20,8 +20,8 @@ import ( "github.com/stretchr/testify/assert" ) -func TestFunc(t *testing.T) { - c := Func(func(targetQPS, qps, oldProbability float64) float64 { +func TestCalculate(t *testing.T) { + c := Calculate(func(targetQPS, qps, oldProbability float64) float64 { return targetQPS }) val := 1.0 diff --git a/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator.go b/plugin/sampling/internal/calculationstrategy/percentage_increase_capped_calculator.go similarity index 100% rename from plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator.go rename to plugin/sampling/internal/calculationstrategy/percentage_increase_capped_calculator.go diff --git a/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator_test.go b/plugin/sampling/internal/calculationstrategy/percentage_increase_capped_calculator_test.go similarity index 95% rename from plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator_test.go rename to plugin/sampling/internal/calculationstrategy/percentage_increase_capped_calculator_test.go index 5fc0aa0c210..9c253000fd5 100644 --- a/plugin/sampling/strategystore/adaptive/calculationstrategy/percentage_increase_capped_calculator_test.go +++ b/plugin/sampling/internal/calculationstrategy/percentage_increase_capped_calculator_test.go @@ -20,7 +20,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestCalculate(t *testing.T) { +func TestPercentageIncreaseCappedCalculator(t *testing.T) { calculator := NewPercentageIncreaseCappedCalculator(0) tests := []struct { targetQPS float64 diff --git a/plugin/sampling/strategystore/adaptive/internal/leader_election.go b/plugin/sampling/internal/leaderelection/leader_election.go similarity index 92% rename from plugin/sampling/strategystore/adaptive/internal/leader_election.go rename to plugin/sampling/internal/leaderelection/leader_election.go index fe3c93df122..03f379bf260 100644 --- a/plugin/sampling/strategystore/adaptive/internal/leader_election.go +++ b/plugin/sampling/internal/leaderelection/leader_election.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal +package leaderelection import ( "sync" @@ -31,7 +31,6 @@ var ( // ElectionParticipant partakes in leader election to become leader. type ElectionParticipant interface { IsLeader() bool - Start() } type electionParticipant struct { @@ -54,17 +53,18 @@ type ElectionParticipantOptions struct { func NewElectionParticipant(lock dl.Lock, resourceName string, options ElectionParticipantOptions) ElectionParticipant { return &electionParticipant{ ElectionParticipantOptions: options, - lock: lock, - resourceName: resourceName, - isLeader: atomic.NewBool(false), - closeChan: make(chan struct{}), + lock: lock, + resourceName: resourceName, + isLeader: atomic.NewBool(false), + closeChan: make(chan struct{}), } } // Start runs a background thread which attempts to acquire the leader lock. -func (p *electionParticipant) Start() { +func (p *electionParticipant) Start() error { p.wg.Add(1) go p.runAcquireLockLoop() + return nil } // Close implements io.Closer. diff --git a/plugin/sampling/strategystore/adaptive/internal/leader_election_test.go b/plugin/sampling/internal/leaderelection/leader_election_test.go similarity index 95% rename from plugin/sampling/strategystore/adaptive/internal/leader_election_test.go rename to plugin/sampling/internal/leaderelection/leader_election_test.go index b5224662f7f..56165eaa1e3 100644 --- a/plugin/sampling/strategystore/adaptive/internal/leader_election_test.go +++ b/plugin/sampling/internal/leaderelection/leader_election_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package internal +package leaderelection import ( "errors" @@ -26,6 +26,7 @@ import ( "go.uber.org/atomic" lmocks "github.com/jaegertracing/jaeger/pkg/distributedlock/mocks" + jio "github.com/jaegertracing/jaeger/pkg/io" "github.com/jaegertracing/jaeger/pkg/testutils" ) @@ -63,7 +64,7 @@ func TestAcquireLock(t *testing.T) { ElectionParticipantOptions: ElectionParticipantOptions{ LeaderLeaseRefreshInterval: leaderInterval, FollowerLeaseRefreshInterval: followerInterval, - Logger: logger, + Logger: logger, }, lock: mockLock, resourceName: "sampling_lock", @@ -85,14 +86,14 @@ func TestRunAcquireLockLoop_followerOnly(t *testing.T) { p := NewElectionParticipant(mockLock, "sampling_lock", ElectionParticipantOptions{ LeaderLeaseRefreshInterval: time.Millisecond, FollowerLeaseRefreshInterval: 5 * time.Millisecond, - Logger: logger, + Logger: logger, }, ) defer func() { assert.NoError(t, p.(io.Closer).Close()) }() - go p.Start() + go p.(jio.Starter).Start() expectedErrorMsg := "Failed to acquire lock" for i := 0; i < 1000; i++ { diff --git a/plugin/sampling/strategystore/adaptive/internal/mocks/ElectionParticipant.go b/plugin/sampling/internal/leaderelection/mocks/ElectionParticipant.go similarity index 100% rename from plugin/sampling/strategystore/adaptive/internal/mocks/ElectionParticipant.go rename to plugin/sampling/internal/leaderelection/mocks/ElectionParticipant.go From e272f60c2a78b832d708243683de21ad97324daf Mon Sep 17 00:00:00 2001 From: Won Jun Jang Date: Wed, 14 Nov 2018 01:27:50 +0800 Subject: [PATCH 6/7] address comments Signed-off-by: Won Jun Jang --- pkg/testutils/logger.go | 11 ++++++++--- pkg/testutils/logger_test.go | 9 ++++++--- .../internal/calculationstrategy/interface.go | 6 +++--- .../internal/calculationstrategy/interface_test.go | 4 ++-- .../internal/leaderelection/leader_election.go | 2 +- .../internal/leaderelection/leader_election_test.go | 8 +++++--- 6 files changed, 25 insertions(+), 15 deletions(-) diff --git a/pkg/testutils/logger.go b/pkg/testutils/logger.go index 141d36a86a1..8ffc85527b9 100644 --- a/pkg/testutils/logger.go +++ b/pkg/testutils/logger.go @@ -16,6 +16,7 @@ package testutils import ( "encoding/json" + "fmt" "strings" "sync" @@ -91,9 +92,10 @@ func (b *Buffer) Write(p []byte) (int, error) { } // LogMatcher is a helper func that returns true if the subStr appears more than 'occurrences' times in the logs. -var LogMatcher = func(occurrences int, subStr string, logs []string) bool { +var LogMatcher = func(occurrences int, subStr string, logs []string) (bool, string) { + errMsg := fmt.Sprintf("subStr '%s' does not occur %d time(s) in %v", subStr, occurrences, logs) if len(logs) < occurrences { - return false + return false, errMsg } var count int for _, log := range logs { @@ -101,5 +103,8 @@ var LogMatcher = func(occurrences int, subStr string, logs []string) bool { count++ } } - return count >= occurrences + if count >= occurrences { + return true, "" + } + return false, errMsg } diff --git a/pkg/testutils/logger_test.go b/pkg/testutils/logger_test.go index 2d37c1e67be..a73443fdcd1 100644 --- a/pkg/testutils/logger_test.go +++ b/pkg/testutils/logger_test.go @@ -74,16 +74,19 @@ func TestLogMatcher(t *testing.T) { subStr string logs []string expected bool + errMsg string }{ - {occurences: 1, expected: false}, + {occurences: 1, expected: false, errMsg: "subStr '' does not occur 1 time(s) in []"}, {occurences: 1, subStr: "hi", logs: []string{"hi"}, expected: true}, - {occurences: 3, subStr: "hi", logs: []string{"hi", "hi"}, expected: false}, + {occurences: 3, subStr: "hi", logs: []string{"hi", "hi"}, expected: false, errMsg: "subStr 'hi' does not occur 3 time(s) in [hi hi]"}, {occurences: 3, subStr: "hi", logs: []string{"hi", "hi", "hi"}, expected: true}, } for i, tt := range tests { test := tt t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - assert.Equal(t, test.expected, LogMatcher(test.occurences, test.subStr, test.logs)) + match, errMsg := LogMatcher(test.occurences, test.subStr, test.logs) + assert.Equal(t, test.expected, match) + assert.Equal(t, test.errMsg, errMsg) }) } } diff --git a/plugin/sampling/internal/calculationstrategy/interface.go b/plugin/sampling/internal/calculationstrategy/interface.go index 2fcc018685e..e24924440d3 100644 --- a/plugin/sampling/internal/calculationstrategy/interface.go +++ b/plugin/sampling/internal/calculationstrategy/interface.go @@ -19,10 +19,10 @@ type ProbabilityCalculator interface { Calculate(targetQPS, curQPS, prevProbability float64) (newProbability float64) } -// Calculate wraps a function of appropriate signature and makes a ProbabilityCalculator from it. -type Calculate func(targetQPS, curQPS, prevProbability float64) (newProbability float64) +// CalculateFunc wraps a function of appropriate signature and makes a ProbabilityCalculator from it. +type CalculateFunc func(targetQPS, curQPS, prevProbability float64) (newProbability float64) // Calculate implements Calculator interface. -func (c Calculate) Calculate(targetQPS, curQPS, prevProbability float64) float64 { +func (c CalculateFunc) Calculate(targetQPS, curQPS, prevProbability float64) float64 { return c(targetQPS, curQPS, prevProbability) } diff --git a/plugin/sampling/internal/calculationstrategy/interface_test.go b/plugin/sampling/internal/calculationstrategy/interface_test.go index fd0a4c54e8e..f6a9a34f6b0 100644 --- a/plugin/sampling/internal/calculationstrategy/interface_test.go +++ b/plugin/sampling/internal/calculationstrategy/interface_test.go @@ -20,8 +20,8 @@ import ( "github.com/stretchr/testify/assert" ) -func TestCalculate(t *testing.T) { - c := Calculate(func(targetQPS, qps, oldProbability float64) float64 { +func TestCalculateFunc(t *testing.T) { + c := CalculateFunc(func(targetQPS, qps, oldProbability float64) float64 { return targetQPS }) val := 1.0 diff --git a/plugin/sampling/internal/leaderelection/leader_election.go b/plugin/sampling/internal/leaderelection/leader_election.go index 03f379bf260..070ee284ebc 100644 --- a/plugin/sampling/internal/leaderelection/leader_election.go +++ b/plugin/sampling/internal/leaderelection/leader_election.go @@ -24,7 +24,7 @@ import ( dl "github.com/jaegertracing/jaeger/pkg/distributedlock" ) -var ( +const ( acquireLockErrMsg = "Failed to acquire lock" ) diff --git a/plugin/sampling/internal/leaderelection/leader_election_test.go b/plugin/sampling/internal/leaderelection/leader_election_test.go index 56165eaa1e3..1cbcf356962 100644 --- a/plugin/sampling/internal/leaderelection/leader_election_test.go +++ b/plugin/sampling/internal/leaderelection/leader_election_test.go @@ -73,7 +73,8 @@ func TestAcquireLock(t *testing.T) { p.setLeader(test.isLeader) assert.Equal(t, test.expectedInterval, p.acquireLock()) - assert.Equal(t, test.expectedError, testutils.LogMatcher(1, acquireLockErrMsg, logBuffer.Lines())) + match, errMsg := testutils.LogMatcher(1, acquireLockErrMsg, logBuffer.Lines()) + assert.Equal(t, test.expectedError, match, errMsg) }) } } @@ -98,11 +99,12 @@ func TestRunAcquireLockLoop_followerOnly(t *testing.T) { expectedErrorMsg := "Failed to acquire lock" for i := 0; i < 1000; i++ { // match logs specific to acquireLockErrMsg. - if testutils.LogMatcher(2, expectedErrorMsg, logBuffer.Lines()) { + if match, _ := testutils.LogMatcher(2, expectedErrorMsg, logBuffer.Lines()); match { break } time.Sleep(time.Millisecond) } - assert.True(t, testutils.LogMatcher(2, expectedErrorMsg, logBuffer.Lines())) + match, errMsg := testutils.LogMatcher(2, expectedErrorMsg, logBuffer.Lines()) + assert.True(t, match, errMsg) assert.False(t, p.IsLeader()) } From 5f52d42ce3ee7af6785cce14302c5ddf5ed5783e Mon Sep 17 00:00:00 2001 From: Won Jun Jang Date: Wed, 14 Nov 2018 02:12:52 +0800 Subject: [PATCH 7/7] code coverage Signed-off-by: Won Jun Jang --- pkg/testutils/logger_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/testutils/logger_test.go b/pkg/testutils/logger_test.go index a73443fdcd1..1293a199422 100644 --- a/pkg/testutils/logger_test.go +++ b/pkg/testutils/logger_test.go @@ -80,6 +80,7 @@ func TestLogMatcher(t *testing.T) { {occurences: 1, subStr: "hi", logs: []string{"hi"}, expected: true}, {occurences: 3, subStr: "hi", logs: []string{"hi", "hi"}, expected: false, errMsg: "subStr 'hi' does not occur 3 time(s) in [hi hi]"}, {occurences: 3, subStr: "hi", logs: []string{"hi", "hi", "hi"}, expected: true}, + {occurences: 1, subStr: "hi", logs: []string{"bye", "bye"}, expected: false, errMsg: "subStr 'hi' does not occur 1 time(s) in [bye bye]"}, } for i, tt := range tests { test := tt