diff --git a/pkg/io/starter.go b/pkg/io/starter.go new file mode 100644 index 00000000000..2c3e88b79f0 --- /dev/null +++ b/pkg/io/starter.go @@ -0,0 +1,20 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package io + +// Starter is the interface that wraps the basic Start() method. +type Starter interface { + Start() error +} diff --git a/pkg/testutils/logger.go b/pkg/testutils/logger.go index 87ad527aa1e..8ffc85527b9 100644 --- a/pkg/testutils/logger.go +++ b/pkg/testutils/logger.go @@ -16,6 +16,8 @@ package testutils import ( "encoding/json" + "fmt" + "strings" "sync" "go.uber.org/zap" @@ -88,3 +90,21 @@ func (b *Buffer) Write(p []byte) (int, error) { defer b.Unlock() return b.Buffer.Write(p) } + +// LogMatcher is a helper func that returns true if the subStr appears more than 'occurrences' times in the logs. +var LogMatcher = func(occurrences int, subStr string, logs []string) (bool, string) { + errMsg := fmt.Sprintf("subStr '%s' does not occur %d time(s) in %v", subStr, occurrences, logs) + if len(logs) < occurrences { + return false, errMsg + } + var count int + for _, log := range logs { + if strings.Contains(log, subStr) { + count++ + } + } + if count >= occurrences { + return true, "" + } + return false, errMsg +} diff --git a/pkg/testutils/logger_test.go b/pkg/testutils/logger_test.go index 402bdfe5330..1293a199422 100644 --- a/pkg/testutils/logger_test.go +++ b/pkg/testutils/logger_test.go @@ -15,6 +15,7 @@ package testutils import ( + "fmt" "sync" "testing" @@ -66,3 +67,27 @@ func TestRaceCondition(t *testing.T) { close(start) finish.Wait() } + +func TestLogMatcher(t *testing.T) { + tests := []struct { + occurences int + subStr string + logs []string + expected bool + errMsg string + }{ + {occurences: 1, expected: false, errMsg: "subStr '' does not occur 1 time(s) in []"}, + {occurences: 1, subStr: "hi", logs: []string{"hi"}, expected: true}, + {occurences: 3, subStr: "hi", logs: []string{"hi", "hi"}, expected: false, errMsg: "subStr 'hi' does not occur 3 time(s) in [hi hi]"}, + {occurences: 3, subStr: "hi", logs: []string{"hi", "hi", "hi"}, expected: true}, + {occurences: 1, subStr: "hi", logs: []string{"bye", "bye"}, expected: false, errMsg: "subStr 'hi' does not occur 1 time(s) in [bye bye]"}, + } + for i, tt := range tests { + test := tt + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + match, errMsg := LogMatcher(test.occurences, test.subStr, test.logs) + assert.Equal(t, test.expected, match) + assert.Equal(t, test.errMsg, errMsg) + }) + } +} diff --git a/plugin/sampling/internal/calculationstrategy/interface.go b/plugin/sampling/internal/calculationstrategy/interface.go new file mode 100644 index 00000000000..e24924440d3 --- /dev/null +++ b/plugin/sampling/internal/calculationstrategy/interface.go @@ -0,0 +1,28 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package calculationstrategy + +// ProbabilityCalculator calculates the new probability given the current and target QPS +type ProbabilityCalculator interface { + Calculate(targetQPS, curQPS, prevProbability float64) (newProbability float64) +} + +// CalculateFunc wraps a function of appropriate signature and makes a ProbabilityCalculator from it. +type CalculateFunc func(targetQPS, curQPS, prevProbability float64) (newProbability float64) + +// Calculate implements Calculator interface. +func (c CalculateFunc) Calculate(targetQPS, curQPS, prevProbability float64) float64 { + return c(targetQPS, curQPS, prevProbability) +} diff --git a/plugin/sampling/internal/calculationstrategy/interface_test.go b/plugin/sampling/internal/calculationstrategy/interface_test.go new file mode 100644 index 00000000000..f6a9a34f6b0 --- /dev/null +++ b/plugin/sampling/internal/calculationstrategy/interface_test.go @@ -0,0 +1,29 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package calculationstrategy + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCalculateFunc(t *testing.T) { + c := CalculateFunc(func(targetQPS, qps, oldProbability float64) float64 { + return targetQPS + }) + val := 1.0 + assert.Equal(t, val, c.Calculate(val, 0, 0)) +} diff --git a/plugin/sampling/internal/calculationstrategy/percentage_increase_capped_calculator.go b/plugin/sampling/internal/calculationstrategy/percentage_increase_capped_calculator.go new file mode 100644 index 00000000000..3a8f73a2d3a --- /dev/null +++ b/plugin/sampling/internal/calculationstrategy/percentage_increase_capped_calculator.go @@ -0,0 +1,60 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package calculationstrategy + +const ( + defaultPercentageIncreaseCap = 0.5 +) + +// PercentageIncreaseCappedCalculator is a probability calculator that caps the probability +// increase to a certain percentage of the previous probability. +// +// Given prevProb = 0.1, newProb = 0.5, and cap = 0.5: +// (0.5 - 0.1)/0.1 = 400% increase. Given that our cap is 50%, the probability can only +// increase to 0.15. +// +// Given prevProb = 0.4, newProb = 0.5, and cap = 0.5: +// (0.5 - 0.4)/0.4 = 25% increase. Given that this is below our cap of 50%, the probability +// can increase to 0.5. +type PercentageIncreaseCappedCalculator struct { + percentageIncreaseCap float64 +} + +// NewPercentageIncreaseCappedCalculator returns a new percentage increase capped calculator. +func NewPercentageIncreaseCappedCalculator(percentageIncreaseCap float64) PercentageIncreaseCappedCalculator { + if percentageIncreaseCap == 0 { + percentageIncreaseCap = defaultPercentageIncreaseCap + } + return PercentageIncreaseCappedCalculator{ + percentageIncreaseCap: percentageIncreaseCap, + } +} + +// Calculate calculates the new probability. +func (c PercentageIncreaseCappedCalculator) Calculate(targetQPS, curQPS, prevProbability float64) float64 { + factor := targetQPS / curQPS + newProbability := prevProbability * factor + // If curQPS is lower than the targetQPS, we need to increase the probability slowly to + // defend against oversampling. + // Else if curQPS is higher than the targetQPS, jump directly to the newProbability to + // defend against oversampling. + if factor > 1.0 { + percentIncrease := (newProbability - prevProbability) / prevProbability + if percentIncrease > c.percentageIncreaseCap { + newProbability = prevProbability + (prevProbability * c.percentageIncreaseCap) + } + } + return newProbability +} diff --git a/plugin/sampling/internal/calculationstrategy/percentage_increase_capped_calculator_test.go b/plugin/sampling/internal/calculationstrategy/percentage_increase_capped_calculator_test.go new file mode 100644 index 00000000000..9c253000fd5 --- /dev/null +++ b/plugin/sampling/internal/calculationstrategy/percentage_increase_capped_calculator_test.go @@ -0,0 +1,40 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package calculationstrategy + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPercentageIncreaseCappedCalculator(t *testing.T) { + calculator := NewPercentageIncreaseCappedCalculator(0) + tests := []struct { + targetQPS float64 + curQPS float64 + oldProbability float64 + expectedProbability float64 + testName string + }{ + {1.0, 2.0, 0.1, 0.05, "test1"}, + {1.0, 0.5, 0.1, 0.15, "test2"}, + {1.0, 0.8, 0.1, 0.125, "test3"}, + } + for _, tt := range tests { + probability := calculator.Calculate(tt.targetQPS, tt.curQPS, tt.oldProbability) + assert.InDelta(t, probability, tt.expectedProbability, 0.0001, tt.testName) + } +} diff --git a/plugin/sampling/internal/leaderelection/leader_election.go b/plugin/sampling/internal/leaderelection/leader_election.go new file mode 100644 index 00000000000..070ee284ebc --- /dev/null +++ b/plugin/sampling/internal/leaderelection/leader_election.go @@ -0,0 +1,117 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leaderelection + +import ( + "sync" + "time" + + "go.uber.org/atomic" + "go.uber.org/zap" + + dl "github.com/jaegertracing/jaeger/pkg/distributedlock" +) + +const ( + acquireLockErrMsg = "Failed to acquire lock" +) + +// ElectionParticipant partakes in leader election to become leader. +type ElectionParticipant interface { + IsLeader() bool +} + +type electionParticipant struct { + ElectionParticipantOptions + lock dl.Lock + isLeader *atomic.Bool + resourceName string + closeChan chan struct{} + wg sync.WaitGroup +} + +// ElectionParticipantOptions control behavior of the election participant. TODO func applyDefaults(), parameter error checking, etc. +type ElectionParticipantOptions struct { + LeaderLeaseRefreshInterval time.Duration + FollowerLeaseRefreshInterval time.Duration + Logger *zap.Logger +} + +// NewElectionParticipant returns a ElectionParticipant which attempts to become leader. +func NewElectionParticipant(lock dl.Lock, resourceName string, options ElectionParticipantOptions) ElectionParticipant { + return &electionParticipant{ + ElectionParticipantOptions: options, + lock: lock, + resourceName: resourceName, + isLeader: atomic.NewBool(false), + closeChan: make(chan struct{}), + } +} + +// Start runs a background thread which attempts to acquire the leader lock. +func (p *electionParticipant) Start() error { + p.wg.Add(1) + go p.runAcquireLockLoop() + return nil +} + +// Close implements io.Closer. +func (p *electionParticipant) Close() error { + close(p.closeChan) + p.wg.Wait() + return nil +} + +// IsLeader returns true if this process is the leader. +func (p *electionParticipant) IsLeader() bool { + return p.isLeader.Load() +} + +// runAcquireLockLoop attempts to acquire the leader lock. If it succeeds, it will attempt to retain it, +// otherwise it sleeps and attempts to gain the lock again. +func (p *electionParticipant) runAcquireLockLoop() { + defer p.wg.Done() + ticker := time.NewTicker(p.acquireLock()) + for { + select { + case <-ticker.C: + ticker.Stop() + ticker = time.NewTicker(p.acquireLock()) + case <-p.closeChan: + ticker.Stop() + return + } + } +} + +// acquireLock attempts to acquire the lock and returns the interval to sleep before the next retry. +func (p *electionParticipant) acquireLock() time.Duration { + if acquiredLeaderLock, err := p.lock.Acquire(p.resourceName, p.FollowerLeaseRefreshInterval); err == nil { + p.setLeader(acquiredLeaderLock) + } else { + p.Logger.Error(acquireLockErrMsg, zap.Error(err)) + } + if p.IsLeader() { + // If this process holds the leader lock, retry with a shorter cadence + // to retain the leader lease. + return p.LeaderLeaseRefreshInterval + } + // If this process failed to acquire the leader lock, retry with a longer cadence + return p.FollowerLeaseRefreshInterval +} + +func (p *electionParticipant) setLeader(isLeader bool) { + p.isLeader.Store(isLeader) +} diff --git a/plugin/sampling/internal/leaderelection/leader_election_test.go b/plugin/sampling/internal/leaderelection/leader_election_test.go new file mode 100644 index 00000000000..1cbcf356962 --- /dev/null +++ b/plugin/sampling/internal/leaderelection/leader_election_test.go @@ -0,0 +1,110 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package leaderelection + +import ( + "errors" + "fmt" + "io" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "go.uber.org/atomic" + + lmocks "github.com/jaegertracing/jaeger/pkg/distributedlock/mocks" + jio "github.com/jaegertracing/jaeger/pkg/io" + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +var ( + errTestLock = errors.New("Lock error") +) + +var _ io.Closer = &electionParticipant{} + +func TestAcquireLock(t *testing.T) { + leaderInterval := time.Millisecond + followerInterval := 5 * time.Millisecond + tests := []struct { + isLeader bool + acquiredLock bool + err error + expectedInterval time.Duration + expectedError bool + }{ + {isLeader: true, acquiredLock: true, err: nil, expectedInterval: leaderInterval, expectedError: false}, + {isLeader: true, acquiredLock: false, err: errTestLock, expectedInterval: leaderInterval, expectedError: true}, + {isLeader: true, acquiredLock: false, err: nil, expectedInterval: followerInterval, expectedError: false}, + {isLeader: false, acquiredLock: false, err: nil, expectedInterval: followerInterval, expectedError: false}, + {isLeader: false, acquiredLock: false, err: errTestLock, expectedInterval: followerInterval, expectedError: true}, + {isLeader: false, acquiredLock: true, err: nil, expectedInterval: leaderInterval, expectedError: false}, + } + + for i, test := range tests { + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + logger, logBuffer := testutils.NewLogger() + mockLock := &lmocks.Lock{} + mockLock.On("Acquire", "sampling_lock", mock.AnythingOfType("time.Duration")).Return(test.acquiredLock, test.err) + + p := &electionParticipant{ + ElectionParticipantOptions: ElectionParticipantOptions{ + LeaderLeaseRefreshInterval: leaderInterval, + FollowerLeaseRefreshInterval: followerInterval, + Logger: logger, + }, + lock: mockLock, + resourceName: "sampling_lock", + isLeader: atomic.NewBool(false), + } + + p.setLeader(test.isLeader) + assert.Equal(t, test.expectedInterval, p.acquireLock()) + match, errMsg := testutils.LogMatcher(1, acquireLockErrMsg, logBuffer.Lines()) + assert.Equal(t, test.expectedError, match, errMsg) + }) + } +} + +func TestRunAcquireLockLoop_followerOnly(t *testing.T) { + logger, logBuffer := testutils.NewLogger() + mockLock := &lmocks.Lock{} + mockLock.On("Acquire", "sampling_lock", mock.AnythingOfType("time.Duration")).Return(false, errTestLock) + + p := NewElectionParticipant(mockLock, "sampling_lock", ElectionParticipantOptions{ + LeaderLeaseRefreshInterval: time.Millisecond, + FollowerLeaseRefreshInterval: 5 * time.Millisecond, + Logger: logger, + }, + ) + + defer func() { + assert.NoError(t, p.(io.Closer).Close()) + }() + go p.(jio.Starter).Start() + + expectedErrorMsg := "Failed to acquire lock" + for i := 0; i < 1000; i++ { + // match logs specific to acquireLockErrMsg. + if match, _ := testutils.LogMatcher(2, expectedErrorMsg, logBuffer.Lines()); match { + break + } + time.Sleep(time.Millisecond) + } + match, errMsg := testutils.LogMatcher(2, expectedErrorMsg, logBuffer.Lines()) + assert.True(t, match, errMsg) + assert.False(t, p.IsLeader()) +} diff --git a/plugin/sampling/internal/leaderelection/mocks/ElectionParticipant.go b/plugin/sampling/internal/leaderelection/mocks/ElectionParticipant.go new file mode 100644 index 00000000000..1dadb25289e --- /dev/null +++ b/plugin/sampling/internal/leaderelection/mocks/ElectionParticipant.go @@ -0,0 +1,37 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocks + +import "github.com/stretchr/testify/mock" + +type ElectionParticipant struct { + mock.Mock +} + +func (_m *ElectionParticipant) IsLeader() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} +func (_m *ElectionParticipant) Start() { + _m.Called() +}