Skip to content

Commit

Permalink
Start moving components of adaptive sampling to OSS (#973)
Browse files Browse the repository at this point in the history
  • Loading branch information
black-adder authored Nov 13, 2018
1 parent c296998 commit 4cc25e6
Show file tree
Hide file tree
Showing 10 changed files with 486 additions and 0 deletions.
20 changes: 20 additions & 0 deletions pkg/io/starter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io

// Starter is the interface that wraps the basic Start() method.
type Starter interface {
Start() error
}
20 changes: 20 additions & 0 deletions pkg/testutils/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package testutils

import (
"encoding/json"
"fmt"
"strings"
"sync"

"go.uber.org/zap"
Expand Down Expand Up @@ -88,3 +90,21 @@ func (b *Buffer) Write(p []byte) (int, error) {
defer b.Unlock()
return b.Buffer.Write(p)
}

// LogMatcher is a helper func that returns true if the subStr appears more than 'occurrences' times in the logs.
var LogMatcher = func(occurrences int, subStr string, logs []string) (bool, string) {
errMsg := fmt.Sprintf("subStr '%s' does not occur %d time(s) in %v", subStr, occurrences, logs)
if len(logs) < occurrences {
return false, errMsg
}
var count int
for _, log := range logs {
if strings.Contains(log, subStr) {
count++
}
}
if count >= occurrences {
return true, ""
}
return false, errMsg
}
25 changes: 25 additions & 0 deletions pkg/testutils/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package testutils

import (
"fmt"
"sync"
"testing"

Expand Down Expand Up @@ -66,3 +67,27 @@ func TestRaceCondition(t *testing.T) {
close(start)
finish.Wait()
}

func TestLogMatcher(t *testing.T) {
tests := []struct {
occurences int
subStr string
logs []string
expected bool
errMsg string
}{
{occurences: 1, expected: false, errMsg: "subStr '' does not occur 1 time(s) in []"},
{occurences: 1, subStr: "hi", logs: []string{"hi"}, expected: true},
{occurences: 3, subStr: "hi", logs: []string{"hi", "hi"}, expected: false, errMsg: "subStr 'hi' does not occur 3 time(s) in [hi hi]"},
{occurences: 3, subStr: "hi", logs: []string{"hi", "hi", "hi"}, expected: true},
{occurences: 1, subStr: "hi", logs: []string{"bye", "bye"}, expected: false, errMsg: "subStr 'hi' does not occur 1 time(s) in [bye bye]"},
}
for i, tt := range tests {
test := tt
t.Run(fmt.Sprintf("%v", i), func(t *testing.T) {
match, errMsg := LogMatcher(test.occurences, test.subStr, test.logs)
assert.Equal(t, test.expected, match)
assert.Equal(t, test.errMsg, errMsg)
})
}
}
28 changes: 28 additions & 0 deletions plugin/sampling/internal/calculationstrategy/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package calculationstrategy

// ProbabilityCalculator calculates the new probability given the current and target QPS
type ProbabilityCalculator interface {
Calculate(targetQPS, curQPS, prevProbability float64) (newProbability float64)
}

// CalculateFunc wraps a function of appropriate signature and makes a ProbabilityCalculator from it.
type CalculateFunc func(targetQPS, curQPS, prevProbability float64) (newProbability float64)

// Calculate implements Calculator interface.
func (c CalculateFunc) Calculate(targetQPS, curQPS, prevProbability float64) float64 {
return c(targetQPS, curQPS, prevProbability)
}
29 changes: 29 additions & 0 deletions plugin/sampling/internal/calculationstrategy/interface_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package calculationstrategy

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestCalculateFunc(t *testing.T) {
c := CalculateFunc(func(targetQPS, qps, oldProbability float64) float64 {
return targetQPS
})
val := 1.0
assert.Equal(t, val, c.Calculate(val, 0, 0))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package calculationstrategy

const (
defaultPercentageIncreaseCap = 0.5
)

// PercentageIncreaseCappedCalculator is a probability calculator that caps the probability
// increase to a certain percentage of the previous probability.
//
// Given prevProb = 0.1, newProb = 0.5, and cap = 0.5:
// (0.5 - 0.1)/0.1 = 400% increase. Given that our cap is 50%, the probability can only
// increase to 0.15.
//
// Given prevProb = 0.4, newProb = 0.5, and cap = 0.5:
// (0.5 - 0.4)/0.4 = 25% increase. Given that this is below our cap of 50%, the probability
// can increase to 0.5.
type PercentageIncreaseCappedCalculator struct {
percentageIncreaseCap float64
}

// NewPercentageIncreaseCappedCalculator returns a new percentage increase capped calculator.
func NewPercentageIncreaseCappedCalculator(percentageIncreaseCap float64) PercentageIncreaseCappedCalculator {
if percentageIncreaseCap == 0 {
percentageIncreaseCap = defaultPercentageIncreaseCap
}
return PercentageIncreaseCappedCalculator{
percentageIncreaseCap: percentageIncreaseCap,
}
}

// Calculate calculates the new probability.
func (c PercentageIncreaseCappedCalculator) Calculate(targetQPS, curQPS, prevProbability float64) float64 {
factor := targetQPS / curQPS
newProbability := prevProbability * factor
// If curQPS is lower than the targetQPS, we need to increase the probability slowly to
// defend against oversampling.
// Else if curQPS is higher than the targetQPS, jump directly to the newProbability to
// defend against oversampling.
if factor > 1.0 {
percentIncrease := (newProbability - prevProbability) / prevProbability
if percentIncrease > c.percentageIncreaseCap {
newProbability = prevProbability + (prevProbability * c.percentageIncreaseCap)
}
}
return newProbability
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package calculationstrategy

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestPercentageIncreaseCappedCalculator(t *testing.T) {
calculator := NewPercentageIncreaseCappedCalculator(0)
tests := []struct {
targetQPS float64
curQPS float64
oldProbability float64
expectedProbability float64
testName string
}{
{1.0, 2.0, 0.1, 0.05, "test1"},
{1.0, 0.5, 0.1, 0.15, "test2"},
{1.0, 0.8, 0.1, 0.125, "test3"},
}
for _, tt := range tests {
probability := calculator.Calculate(tt.targetQPS, tt.curQPS, tt.oldProbability)
assert.InDelta(t, probability, tt.expectedProbability, 0.0001, tt.testName)
}
}
117 changes: 117 additions & 0 deletions plugin/sampling/internal/leaderelection/leader_election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package leaderelection

import (
"sync"
"time"

"go.uber.org/atomic"
"go.uber.org/zap"

dl "github.com/jaegertracing/jaeger/pkg/distributedlock"
)

const (
acquireLockErrMsg = "Failed to acquire lock"
)

// ElectionParticipant partakes in leader election to become leader.
type ElectionParticipant interface {
IsLeader() bool
}

type electionParticipant struct {
ElectionParticipantOptions
lock dl.Lock
isLeader *atomic.Bool
resourceName string
closeChan chan struct{}
wg sync.WaitGroup
}

// ElectionParticipantOptions control behavior of the election participant. TODO func applyDefaults(), parameter error checking, etc.
type ElectionParticipantOptions struct {
LeaderLeaseRefreshInterval time.Duration
FollowerLeaseRefreshInterval time.Duration
Logger *zap.Logger
}

// NewElectionParticipant returns a ElectionParticipant which attempts to become leader.
func NewElectionParticipant(lock dl.Lock, resourceName string, options ElectionParticipantOptions) ElectionParticipant {
return &electionParticipant{
ElectionParticipantOptions: options,
lock: lock,
resourceName: resourceName,
isLeader: atomic.NewBool(false),
closeChan: make(chan struct{}),
}
}

// Start runs a background thread which attempts to acquire the leader lock.
func (p *electionParticipant) Start() error {
p.wg.Add(1)
go p.runAcquireLockLoop()
return nil
}

// Close implements io.Closer.
func (p *electionParticipant) Close() error {
close(p.closeChan)
p.wg.Wait()
return nil
}

// IsLeader returns true if this process is the leader.
func (p *electionParticipant) IsLeader() bool {
return p.isLeader.Load()
}

// runAcquireLockLoop attempts to acquire the leader lock. If it succeeds, it will attempt to retain it,
// otherwise it sleeps and attempts to gain the lock again.
func (p *electionParticipant) runAcquireLockLoop() {
defer p.wg.Done()
ticker := time.NewTicker(p.acquireLock())
for {
select {
case <-ticker.C:
ticker.Stop()
ticker = time.NewTicker(p.acquireLock())
case <-p.closeChan:
ticker.Stop()
return
}
}
}

// acquireLock attempts to acquire the lock and returns the interval to sleep before the next retry.
func (p *electionParticipant) acquireLock() time.Duration {
if acquiredLeaderLock, err := p.lock.Acquire(p.resourceName, p.FollowerLeaseRefreshInterval); err == nil {
p.setLeader(acquiredLeaderLock)
} else {
p.Logger.Error(acquireLockErrMsg, zap.Error(err))
}
if p.IsLeader() {
// If this process holds the leader lock, retry with a shorter cadence
// to retain the leader lease.
return p.LeaderLeaseRefreshInterval
}
// If this process failed to acquire the leader lock, retry with a longer cadence
return p.FollowerLeaseRefreshInterval
}

func (p *electionParticipant) setLeader(isLeader bool) {
p.isLeader.Store(isLeader)
}
Loading

0 comments on commit 4cc25e6

Please sign in to comment.